From 7f88972af5bddcdc77d7b976a0ca94be2f9e37ae Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu Date: Fri, 3 Apr 2026 19:56:44 +0200 Subject: [PATCH v5 3/4] Support snapshot resets in parallel concurrent index builds Extend periodic snapshot reset support to parallel builds, previously limited to non-parallel operations. This allows the xmin horizon to advance during parallel concurrent index builds as well. The main limitation of applying that technique to parallel builds was a requirement to wait until worker processes restore their initial snapshot from leader. To address this, following changes applied: - add infrastructure to track snapshot restoration in parallel workers - extend parallel scan initialization to support periodic snapshot resets - wait for parallel workers to restore their initial snapshots before proceeding with scan - relax limitation for parallel worker to call GetLatestSnapshot --- src/backend/access/brin/brin.c | 54 ++++++++++++---- src/backend/access/gin/gininsert.c | 51 +++++++++++---- src/backend/access/heap/heapam_handler.c | 7 +-- src/backend/access/nbtree/nbtsort.c | 51 +++++++++++++-- src/backend/access/table/tableam.c | 37 +++++++++-- src/backend/access/transam/parallel.c | 62 +++++++++++++++++-- src/backend/executor/nodeSeqscan.c | 3 +- src/backend/executor/nodeTidrangescan.c | 3 +- src/backend/utils/time/snapmgr.c | 8 --- src/include/access/parallel.h | 3 +- src/include/access/relscan.h | 1 + src/include/access/tableam.h | 5 +- src/include/catalog/index.h | 4 +- .../expected/cic_reset_snapshots.out | 25 +++++++- .../sql/cic_reset_snapshots.sql | 5 +- 15 files changed, 259 insertions(+), 60 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 09741186264..ff57fe61add 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -1275,6 +1275,9 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = idxtuples; + InvalidateCatalogSnapshot(); + Assert(!IndexBuildResetsSnapshots(indexInfo) || !TransactionIdIsValid(MyProc->xmin)); + return result; } @@ -2394,6 +2397,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool reset_snapshot; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -2411,12 +2415,16 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, scantuplesortstates = leaderparticipates ? request + 1 : request; + reset_snapshot = isconcurrent && !IsolationUsesXactSnapshot(); + /* * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot may be reset periodically (in non-xact-snapshot isolation + * modes) to allow the xmin horizon to advance. */ if (!isconcurrent) { @@ -2424,10 +2432,15 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, snapshot = SnapshotAny; need_pop_active_snapshot = false; } + else if (reset_snapshot) + { + snapshot = InvalidSnapshot; + PushActiveSnapshot(GetTransactionSnapshot()); + } else { snapshot = RegisterSnapshot(GetTransactionSnapshot()); - PushActiveSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(snapshot); } /* @@ -2473,7 +2486,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); @@ -2499,7 +2512,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromBrinShared(brinshared), - snapshot); + snapshot, + reset_snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2561,6 +2575,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = brinleader; + /* + * In case of concurrent build with snapshot resets, wait until all + * workers imported initial snapshot. + */ + if (reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _brin_leader_participate_as_worker(buildstate, heap, index); @@ -2569,9 +2590,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + + InvalidateCatalogSnapshot(); + Assert(!reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); } /* @@ -2592,8 +2617,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) + if (brinleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(brinleader->snapshot)) UnregisterSnapshot(brinleader->snapshot); DestroyParallelContext(brinleader->pcxt); ExitParallelMode(); @@ -2794,7 +2818,7 @@ _brin_parallel_merge(BrinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. + * brin index build. */ static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) @@ -2843,6 +2867,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, { SortCoordinate coordinate; TableScanDesc scan; + ParallelTableScanDesc pscan; double reltuples; IndexInfo *indexInfo; @@ -2859,13 +2884,18 @@ _brin_parallel_scan_and_build(BrinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); indexInfo->ii_Concurrent = brinshared->isconcurrent; + pscan = ParallelTableScanFromBrinShared(brinshared); scan = table_beginscan_parallel(heap, - ParallelTableScanFromBrinShared(brinshared), + pscan, SO_NONE); reltuples = table_index_build_scan(heap, index, indexInfo, true, true, brinbuildCallbackParallel, state, scan); + InvalidateCatalogSnapshot(); + if (pscan->phs_reset_snapshot) + PopActiveSnapshot(); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); /* insert the last item */ form_and_spill_tuple(state); @@ -2888,6 +2918,9 @@ _brin_parallel_scan_and_build(BrinBuildState *state, ConditionVariableSignal(&brinshared->workersdonecv); tuplesort_end(state->bs_sortstate); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); + if (pscan->phs_reset_snapshot) + PushActiveSnapshot(GetTransactionSnapshot()); } /* @@ -2964,7 +2997,6 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort, heapRel, indexRel, sortmem, false); - /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 5b035aa5dec..3251db0e617 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -809,6 +809,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = buildstate.indtuples; + Assert(!IndexBuildResetsSnapshots(indexInfo) || !TransactionIdIsValid(MyProc->xmin)); return result; } @@ -957,6 +958,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool reset_snapshot; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -973,12 +975,16 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, scantuplesortstates = leaderparticipates ? request + 1 : request; + reset_snapshot = isconcurrent && !IsolationUsesXactSnapshot(); + /* * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot may be reset periodically (in non-xact-snapshot isolation + * modes) to allow the xmin horizon to advance. */ if (!isconcurrent) { @@ -986,10 +992,15 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, snapshot = SnapshotAny; need_pop_active_snapshot = false; } + else if (reset_snapshot) + { + snapshot = InvalidSnapshot; + PushActiveSnapshot(GetTransactionSnapshot()); + } else { snapshot = RegisterSnapshot(GetTransactionSnapshot()); - PushActiveSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(snapshot); } /* @@ -1035,7 +1046,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); @@ -1060,7 +1071,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromGinBuildShared(ginshared), - snapshot); + snapshot, + reset_snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1118,6 +1130,13 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = ginleader; + /* + * In case of concurrent build with snapshot resets, wait until all + * workers imported initial snapshot. + */ + if (reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _gin_leader_participate_as_worker(buildstate, heap, index); @@ -1126,9 +1145,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); } /* @@ -1149,8 +1171,7 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(ginleader->snapshot)) + if (ginleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(ginleader->snapshot)) UnregisterSnapshot(ginleader->snapshot); DestroyParallelContext(ginleader->pcxt); ExitParallelMode(); @@ -1826,7 +1847,7 @@ _gin_parallel_merge(GinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * gin index build based on the snapshot its parallel scan will use. + * gin index build. */ static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) @@ -2058,6 +2079,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, { SortCoordinate coordinate; TableScanDesc scan; + ParallelTableScanDesc pscan; double reltuples; IndexInfo *indexInfo; @@ -2088,13 +2110,18 @@ _gin_parallel_scan_and_build(GinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); indexInfo->ii_Concurrent = ginshared->isconcurrent; + pscan = ParallelTableScanFromGinBuildShared(ginshared); scan = table_beginscan_parallel(heap, - ParallelTableScanFromGinBuildShared(ginshared), + pscan, SO_NONE); reltuples = table_index_build_scan(heap, index, indexInfo, true, progress, ginBuildCallbackParallel, state, scan); + InvalidateCatalogSnapshot(); + if (pscan->phs_reset_snapshot) + PopActiveSnapshot(); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); /* write remaining accumulated entries */ ginFlushBuildState(state, index); @@ -2124,6 +2151,9 @@ _gin_parallel_scan_and_build(GinBuildState *state, ConditionVariableSignal(&ginshared->workersdonecv); tuplesort_end(state->bs_sortstate); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); + if (pscan->phs_reset_snapshot) + PushActiveSnapshot(GetTransactionSnapshot()); } /* @@ -2219,7 +2249,6 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); - /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index faf3e04c449..1184ed086fe 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1213,7 +1213,8 @@ heapam_index_build_range_scan(Relation heapRelation, * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, or during bootstrap, we take a regular MVCC snapshot - * and index whatever's live according to that. + * and index whatever's live according to that while that snapshot is reset + * every so often (in the case of non-unique index). */ OldestXmin = InvalidTransactionId; @@ -1221,8 +1222,6 @@ heapam_index_build_range_scan(Relation heapRelation, * For unique indexes we need a consistent snapshot for the whole scan. * Resetting snapshots also doesn't work in xact-snapshot isolation modes, * because those keep a registered transaction snapshot for the whole xact. - * In the case of parallel scan, some additional infrastructure is required - * to perform a scan with SO_RESET_SNAPSHOT which is not yet ready. */ reset_snapshots = IndexBuildResetsSnapshots(indexInfo) && !is_system_catalog; /* just for the case */ @@ -1284,7 +1283,7 @@ heapam_index_build_range_scan(Relation heapRelation, Assert(!IsBootstrapProcessingMode()); Assert(allow_sync); snapshot = scan->rs_snapshot; - if (IsMVCCSnapshot(snapshot)) + if (!reset_snapshots && IsMVCCSnapshot(snapshot)) { /* Don't expose SnapshotAny to SQL run by predicates/expressions. */ PushActiveSnapshot(snapshot); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 8d804d6bcfe..6bb365c951d 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1419,6 +1419,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool reset_snapshot; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -1436,12 +1437,24 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) scantuplesortstates = leaderparticipates ? request + 1 : request; + /* + * For concurrent non-unique index builds, we can periodically reset + * snapshots to allow the xmin horizon to advance. This is safe since + * these builds don't require a consistent view across the entire scan. + * Unique indexes still need a stable snapshot to properly enforce + * uniqueness constraints. Isolation modes where + * IsolationUsesXactSnapshot() is true also prevent resetting because + * they keep a registered transaction snapshot for the whole transaction. + */ + reset_snapshot = isconcurrent && !IsolationUsesXactSnapshot() && !btspool->isunique; + /* * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * live according to that, while that snapshot may be reset periodically + * for non-unique indexes in non-xact-snapshot isolation modes. */ if (!isconcurrent) { @@ -1449,6 +1462,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) snapshot = SnapshotAny; need_pop_active_snapshot = false; } + else if (reset_snapshot) + { + snapshot = InvalidSnapshot; + PushActiveSnapshot(GetTransactionSnapshot()); + } else { snapshot = RegisterSnapshot(GetTransactionSnapshot()); @@ -1509,7 +1527,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); @@ -1536,7 +1554,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btshared->brokenhotchain = false; table_parallelscan_initialize(btspool->heap, ParallelTableScanFromBTShared(btshared), - snapshot); + snapshot, + reset_snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1612,6 +1631,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Save leader state now that it's clear build will be parallel */ buildstate->btleader = btleader; + /* + * In the case of concurrent build, snapshots are going to be reset periodically. + * Wait until all workers imported initial snapshot. + */ + if (reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _bt_leader_participate_as_worker(buildstate); @@ -1620,9 +1646,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + + InvalidateCatalogSnapshot(); + Assert(!reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); } /* @@ -1644,7 +1674,7 @@ _bt_end_parallel(BTLeader *btleader) InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) + if (btleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(btleader->snapshot)) UnregisterSnapshot(btleader->snapshot); DestroyParallelContext(btleader->pcxt); ExitParallelMode(); @@ -1894,6 +1924,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, SortCoordinate coordinate; BTBuildState buildstate; TableScanDesc scan; + ParallelTableScanDesc pscan; double reltuples; IndexInfo *indexInfo; @@ -1948,12 +1979,17 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); indexInfo->ii_Concurrent = btshared->isconcurrent; + pscan = ParallelTableScanFromBTShared(btshared); scan = table_beginscan_parallel(btspool->heap, - ParallelTableScanFromBTShared(btshared), + pscan, SO_NONE); reltuples = table_index_build_scan(btspool->heap, btspool->index, indexInfo, true, progress, _bt_build_callback, &buildstate, scan); + InvalidateCatalogSnapshot(); + if (pscan->phs_reset_snapshot) + PopActiveSnapshot(); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); /* Execute this worker's part of the sort */ if (progress) @@ -1989,4 +2025,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, tuplesort_end(btspool->sortstate); if (btspool2) tuplesort_end(btspool2->sortstate); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); + if (pscan->phs_reset_snapshot) + PushActiveSnapshot(GetTransactionSnapshot()); } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 68ff0966f1c..cf337eda5f6 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -132,10 +132,10 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) { Size sz = 0; - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) sz = add_size(sz, EstimateSnapshotSpace(snapshot)); else - Assert(snapshot == SnapshotAny); + Assert(snapshot == SnapshotAny || snapshot == InvalidSnapshot); sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); @@ -144,21 +144,36 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot) + Snapshot snapshot, bool reset_snapshot) { Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); pscan->phs_snapshot_off = snapshot_off; - if (IsMVCCSnapshot(snapshot)) + /* + * Initialize parallel scan description. For normal scans with a regular + * MVCC snapshot, serialize the snapshot info. For scans that use periodic + * snapshot resets, mark the scan accordingly. + */ + if (reset_snapshot) + { + Assert(snapshot == InvalidSnapshot); + pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = true; + INJECTION_POINT("table_parallelscan_initialize", NULL); + } + else if (IsMVCCSnapshot(snapshot)) { SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = false; } else { Assert(snapshot == SnapshotAny); + Assert(!reset_snapshot); pscan->phs_snapshot_any = true; + pscan->phs_reset_snapshot = false; } } @@ -172,7 +187,19 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan, Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator)); - if (!pscan->phs_snapshot_any) + /* + * For scans that + * use periodic snapshot resets, mark the scan accordingly and use the active + * snapshot as the initial state. + */ + if (pscan->phs_reset_snapshot) + { + Assert(ActiveSnapshotSet()); + internal_flags |= SO_RESET_SNAPSHOT; + /* Start with current active snapshot. */ + snapshot = GetActiveSnapshot(); + } + else if (!pscan->phs_snapshot_any) { /* Snapshot was serialized -- restore it */ snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 89e9d224eec..6f50f5aff9c 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -79,6 +79,7 @@ #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D) #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E) #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F) +#define PARALLEL_KEY_SNAPSHOT_RESTORED UINT64CONST(0xFFFFFFFFFFFF0010) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -308,6 +309,10 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(bool), + pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate how much we'll need for the entrypoint info. */ shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + strlen(pcxt->function_name) + 2); @@ -379,6 +384,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *entrypointstate; char *uncommittedenumsspace; char *clientconninfospace; + bool *snapshot_set_flag_space; Size lnamelen; /* Serialize shared libraries we have loaded. */ @@ -494,6 +500,19 @@ InitializeParallelDSM(ParallelContext *pcxt) strcpy(entrypointstate, pcxt->library_name); strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); + + /* + * Establish dynamic shared memory to pass information about importing + * of snapshot. + */ + snapshot_set_flag_space = + shm_toc_allocate(pcxt->toc, mul_size(sizeof(bool), pcxt->nworkers)); + for (i = 0; i < pcxt->nworkers; ++i) + { + pcxt->worker[i].snapshot_restored = &snapshot_set_flag_space[i]; + *pcxt->worker[i].snapshot_restored = false; + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, snapshot_set_flag_space); } /* Update nworkers_to_launch, in case we changed nworkers above. */ @@ -670,6 +689,10 @@ LaunchParallelWorkers(ParallelContext *pcxt) * Wait for all workers to attach to their error queues, and throw an error if * any worker fails to do this. * + * wait_for_snapshot: track whether each parallel worker has successfully restored + * its snapshot. This is needed when using periodic snapshot resets to ensure all + * workers have a valid initial snapshot before proceeding with the scan. + * * Callers can assume that if this function returns successfully, then the * number of workers given by pcxt->nworkers_launched have initialized and * attached to their error queues. Whether or not these workers are guaranteed @@ -699,7 +722,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) * call this function at all. */ void -WaitForParallelWorkersToAttach(ParallelContext *pcxt) +WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot) { int i; @@ -743,9 +766,24 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); if (shm_mq_get_sender(mq) != NULL) { - /* Yes, so it is known to be attached. */ - pcxt->known_attached_workers[i] = true; - ++pcxt->nknown_attached_workers; + pg_read_barrier(); + if (!wait_for_snapshot || *pcxt->worker[i].snapshot_restored) + { + /* Worker is known to be attached. */ + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + } + else + { + /* + * Worker attached but hasn't restored its snapshot yet. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10, WAIT_EVENT_BGWORKER_STARTUP); + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } } } else if (status == BGWH_STOPPED) @@ -788,6 +826,16 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) break; } } + + /* Set snapshot restored flag to false. */ + if (pcxt->nworkers > 0) + { + bool *snapshot_restored_space; + snapshot_restored_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + for (i = 0; i < pcxt->nworkers; ++i) + snapshot_restored_space[i] = false; + } } /* @@ -1304,6 +1352,7 @@ ParallelWorkerMain(Datum main_arg) shm_toc *toc; FixedParallelState *fps; char *error_queue_space; + bool *snapshot_restored_space; shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; @@ -1507,6 +1556,11 @@ ParallelWorkerMain(Datum main_arg) fps->parallel_leader_pgproc); PushActiveSnapshot(asnapshot); + /* Snapshot is restored, set flag to make leader know about it. */ + snapshot_restored_space = shm_toc_lookup(toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + snapshot_restored_space[ParallelWorkerNumber] = true; + pg_write_barrier(); + /* * We've changed which tuples we can see, and must therefore invalidate * system caches. diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 5bcb0a861d7..122d7458561 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -404,7 +404,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node, pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c index b387ed6c308..07e575164a1 100644 --- a/src/backend/executor/nodeTidrangescan.c +++ b/src/backend/executor/nodeTidrangescan.c @@ -488,7 +488,8 @@ ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt) pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 10fe18df2e7..f22da271b9e 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -353,14 +353,6 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { - /* - * We might be able to relax this, but nothing that could otherwise work - * needs it. - */ - if (IsInParallelMode()) - elog(ERROR, - "cannot update SecondarySnapshot during a parallel operation"); - /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 60f857675e0..9a3e14e0c0e 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -28,6 +28,7 @@ typedef struct ParallelWorkerInfo { BackgroundWorkerHandle *bgwhandle; shm_mq_handle *error_mqh; + bool *snapshot_restored; } ParallelWorkerInfo; typedef struct ParallelContext @@ -67,7 +68,7 @@ extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch); extern void LaunchParallelWorkers(ParallelContext *pcxt); -extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot); extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 2ea06a67a63..8d54fdc169e 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -87,6 +87,7 @@ typedef struct ParallelTableScanDescData RelFileLocator phs_locator; /* physical relation to scan */ bool phs_syncscan; /* report location to syncscan logic? */ bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */ + bool phs_reset_snapshot; /* use SO_RESET_SNAPSHOT? */ Size phs_snapshot_off; /* data for snapshot */ } ParallelTableScanDescData; typedef struct ParallelTableScanDescData *ParallelTableScanDesc; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 3560ba40fc2..219525dbcb0 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1212,7 +1212,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot); */ extern void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot); + Snapshot snapshot, + bool reset_snapshot); /* * Begin a parallel scan. `pscan` needs to have been initialized with @@ -1866,7 +1867,7 @@ table_scan_analyze_next_tuple(TableScanDesc scan, * This only really makes sense for heap AM, it might need to be generalized * for other AMs later. * - * In case of non-unique index and non-parallel concurrent build, + * In case of non-unique concurrent build, * concurrent_index_reset_snapshot_every_n_pages is applied for the scan. * That leads to changing snapshots on the fly to allow xmin horizon propagate. */ diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 3d93232361f..60f8356ed82 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -35,13 +35,11 @@ typedef struct AttrMap AttrMap; * - the index is non-unique (unique needs consistent snapshot) * - isolation level is not REPEATABLE READ/SERIALIZABLE (those keep a * registered transaction snapshot) - * - the build is not parallel (parallel needs separate infrastructure) */ #define IndexBuildResetsSnapshots(indexInfo) \ ((indexInfo)->ii_Concurrent && \ !(indexInfo)->ii_Unique && \ - !IsolationUsesXactSnapshot() && \ - !(indexInfo)->ii_ParallelWorkers) + !IsolationUsesXactSnapshot()) /* Action code for index_set_state_flags */ typedef enum diff --git a/src/test/modules/injection_points/expected/cic_reset_snapshots.out b/src/test/modules/injection_points/expected/cic_reset_snapshots.out index ed4aaaf3463..e5a8a7f3a79 100644 --- a/src/test/modules/injection_points/expected/cic_reset_snapshots.out +++ b/src/test/modules/injection_points/expected/cic_reset_snapshots.out @@ -17,6 +17,12 @@ SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice' (1 row) +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); + injection_points_attach +------------------------- + +(1 row) + CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); INSERT INTO cic_reset_snap.tbl SELECT i, i * I FROM generate_series(1, 200) s(i); @@ -72,30 +78,45 @@ NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + injection_points_detach +------------------------- + +(1 row) + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(MOD(i, 2), j) WHERE MOD(i, 2) = 0; +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable(i); NOTICE: notice triggered for injection point table_beginscan_strat_reset_snapshots -NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; NOTICE: notice triggered for injection point table_beginscan_strat_reset_snapshots -NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable_no_param(); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i DESC NULLS LAST); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl USING BRIN(i); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; BEGIN TRANSACTION; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); diff --git a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql index 553787539b3..a85b0cbf575 100644 --- a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql +++ b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql @@ -3,7 +3,7 @@ CREATE EXTENSION injection_points; SELECT injection_points_set_local(); SELECT injection_points_attach('heap_reset_scan_snapshot_effective', 'notice'); SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice'); - +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); @@ -53,6 +53,9 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- 2.43.0