From 39b26f7deb07901048fdf2ee6193b38f47fdcef3 Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu Date: Wed, 14 Jan 2026 18:45:56 +0300 Subject: [PATCH v2 3/4] Support snapshot resets in parallel concurrent index builds Extend periodic snapshot reset support to parallel builds, previously limited to non-parallel operations. This allows the xmin horizon to advance during parallel concurrent index builds as well. The main limitation of applying that technique to parallel builds was a requirement to wait until worker processes restore their initial snapshot from leader. To address this, following changes applied: - add infrastructure to track snapshot restoration in parallel workers - extend parallel scan initialization to support periodic snapshot resets - wait for parallel workers to restore their initial snapshots before proceeding with scan - relax limitation for parallel worker to call GetLatestSnapshot --- src/backend/access/brin/brin.c | 54 ++++++++++------- src/backend/access/gin/gininsert.c | 51 +++++++++------- src/backend/access/heap/heapam_handler.c | 12 ++-- src/backend/access/nbtree/nbtsort.c | 60 ++++++++++++++----- src/backend/access/table/tableam.c | 37 ++++++++++-- src/backend/access/transam/parallel.c | 49 +++++++++++++-- src/backend/catalog/index.c | 2 +- src/backend/executor/nodeSeqscan.c | 3 +- src/backend/executor/nodeTidrangescan.c | 3 +- src/backend/utils/time/snapmgr.c | 8 --- src/include/access/parallel.h | 3 +- src/include/access/relscan.h | 1 + src/include/access/tableam.h | 5 +- .../sql/cic_reset_snapshots.sql | 5 +- 14 files changed, 208 insertions(+), 85 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 498eb2b991b..c1839e15c9d 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -145,7 +145,6 @@ typedef struct BrinLeader */ BrinShared *brinshared; Sharedsort *sharedsort; - Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; } BrinLeader; @@ -233,7 +232,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); -static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static Size _brin_parallel_estimate_shared(Relation heap); static double _brin_parallel_heapscan(BrinBuildState *state); static double _brin_parallel_merge(BrinBuildState *state); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, @@ -1224,7 +1223,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) reltuples = _brin_parallel_merge(state); _brin_end_parallel(state->bs_leader, state); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } else /* no parallel index build */ { @@ -1257,7 +1255,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) brin_fill_empty_ranges(state, state->bs_currRangeStart, state->bs_maxRangeStart); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } /* release resources */ @@ -1273,6 +1270,9 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = idxtuples; + InvalidateCatalogSnapshot(); + Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + return result; } @@ -2385,7 +2385,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { ParallelContext *pcxt; int scantuplesortstates; - Snapshot snapshot; Size estbrinshared; Size estsort; BrinShared *brinshared; @@ -2416,25 +2415,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot is reset periodically. */ if (!isconcurrent) { Assert(ActiveSnapshotSet()); - snapshot = SnapshotAny; need_pop_active_snapshot = false; } else { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + Assert(!ActiveSnapshotSet()); PushActiveSnapshot(GetTransactionSnapshot()); } /* * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. */ - estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + estbrinshared = _brin_parallel_estimate_shared(heap); shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -2474,8 +2473,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) - UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; @@ -2500,7 +2497,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromBrinShared(brinshared), - snapshot); + isconcurrent ? InvalidSnapshot : SnapshotAny, + isconcurrent); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2546,7 +2544,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->nparticipanttuplesorts++; brinleader->brinshared = brinshared; brinleader->sharedsort = sharedsort; - brinleader->snapshot = snapshot; brinleader->walusage = walusage; brinleader->bufferusage = bufferusage; @@ -2562,6 +2559,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = brinleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * We need to wait until all workers imported initial snapshot. + */ + if (isconcurrent) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _brin_leader_participate_as_worker(buildstate, heap, index); @@ -2570,9 +2574,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!isconcurrent) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + + InvalidateCatalogSnapshot(); + Assert(!brinleader->brinshared->isconcurrent || !TransactionIdIsValid(MyProc->xmin)); } /* @@ -2593,9 +2601,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); DestroyParallelContext(brinleader->pcxt); ExitParallelMode(); } @@ -2795,14 +2800,14 @@ _brin_parallel_merge(BrinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. + * brin index build. */ static Size -_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +_brin_parallel_estimate_shared(Relation heap) { /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ return add_size(BUFFERALIGN(sizeof(BrinShared)), - table_parallelscan_estimate(heap, snapshot)); + table_parallelscan_estimate(heap, InvalidSnapshot)); } /* @@ -2964,6 +2969,13 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort, heapRel, indexRel, sortmem, false); + if (brinshared->isconcurrent) + { + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xmin)); + PushActiveSnapshot(GetTransactionSnapshot()); + } /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index b0087cb1a62..d46ad210651 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -135,7 +135,6 @@ typedef struct GinLeader */ GinBuildShared *ginshared; Sharedsort *sharedsort; - Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; } GinLeader; @@ -185,7 +184,7 @@ typedef struct static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); -static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static Size _gin_parallel_estimate_shared(Relation heap); static double _gin_parallel_heapscan(GinBuildState *state); static double _gin_parallel_merge(GinBuildState *state); static void _gin_leader_participate_as_worker(GinBuildState *buildstate, @@ -752,7 +751,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) reltuples = _gin_parallel_merge(state); _gin_end_parallel(state->bs_leader, state); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } else /* no parallel index build */ { @@ -776,7 +774,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) list, nlist, &buildstate.buildStats); } MemoryContextSwitchTo(oldCtx); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } MemoryContextDelete(buildstate.funcCtx); @@ -806,6 +803,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = buildstate.indtuples; + Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); return result; } @@ -940,7 +938,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, { ParallelContext *pcxt; int scantuplesortstates; - Snapshot snapshot; Size estginshared; Size estsort; GinBuildShared *ginshared; @@ -970,25 +967,25 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot is reset periodically. */ if (!isconcurrent) { Assert(ActiveSnapshotSet()); - snapshot = SnapshotAny; need_pop_active_snapshot = false; } else { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + Assert(!ActiveSnapshotSet()); PushActiveSnapshot(GetTransactionSnapshot()); } /* * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. */ - estginshared = _gin_parallel_estimate_shared(heap, snapshot); + estginshared = _gin_parallel_estimate_shared(heap); shm_toc_estimate_chunk(&pcxt->estimator, estginshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1028,8 +1025,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) - UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; @@ -1053,7 +1048,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromGinBuildShared(ginshared), - snapshot); + isconcurrent ? InvalidSnapshot : SnapshotAny, + isconcurrent); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1095,7 +1091,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, ginleader->nparticipanttuplesorts++; ginleader->ginshared = ginshared; ginleader->sharedsort = sharedsort; - ginleader->snapshot = snapshot; ginleader->walusage = walusage; ginleader->bufferusage = bufferusage; @@ -1111,6 +1106,13 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = ginleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * We need to wait until all workers imported initial snapshot. + */ + if (isconcurrent) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _gin_leader_participate_as_worker(buildstate, heap, index); @@ -1119,9 +1121,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!isconcurrent) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!ginleader->ginshared->isconcurrent || !TransactionIdIsValid(MyProc->xmin)); } /* @@ -1142,9 +1147,6 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(ginleader->snapshot)) - UnregisterSnapshot(ginleader->snapshot); DestroyParallelContext(ginleader->pcxt); ExitParallelMode(); } @@ -1819,14 +1821,14 @@ _gin_parallel_merge(GinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * gin index build based on the snapshot its parallel scan will use. + * gin index build. */ static Size -_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +_gin_parallel_estimate_shared(Relation heap) { /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ return add_size(BUFFERALIGN(sizeof(GinBuildShared)), - table_parallelscan_estimate(heap, snapshot)); + table_parallelscan_estimate(heap, InvalidSnapshot)); } /* @@ -2211,6 +2213,13 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); + if (ginshared->isconcurrent) + { + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xmin)); + PushActiveSnapshot(GetTransactionSnapshot()); + } /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 16c460aa23f..24a554a10d4 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1251,14 +1251,13 @@ heapam_index_build_range_scan(Relation heapRelation, * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, or during bootstrap, we take a regular MVCC snapshot - * and index whatever's live according to that. + * and index whatever's live according to that while that snapshot is reset + * every so often (in case of non-unique index). */ OldestXmin = InvalidTransactionId; /* * For unique index we need consistent snapshot for the whole scan. - * In case of parallel scan some additional infrastructure required - * to perform scan with SO_RESET_SNAPSHOT which is not yet ready. */ reset_snapshots = indexInfo->ii_Concurrent && !indexInfo->ii_Unique && @@ -1320,8 +1319,11 @@ heapam_index_build_range_scan(Relation heapRelation, Assert(!IsBootstrapProcessingMode()); Assert(allow_sync); snapshot = scan->rs_snapshot; - PushActiveSnapshot(snapshot); - need_pop_active_snapshot = true; + if (!reset_snapshots) + { + PushActiveSnapshot(snapshot); + need_pop_active_snapshot = true; + } } hscan = (HeapScanDesc) scan; diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 6fc72bd2c94..9c395d1ac38 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -324,22 +324,20 @@ btbuild(Relation heap, Relation index, IndexInfo *indexInfo) RelationGetRelationName(index)); reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* * Finish the build by (1) completing the sort of the spool file, (2) * inserting the sorted tuples into btree pages and (3) building the upper * levels. Finally, it may also be necessary to end use of parallelism. */ - _bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_ParallelWorkers && indexInfo->ii_Concurrent); + _bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_Unique && indexInfo->ii_Concurrent); _bt_spooldestroy(buildstate.spool); if (buildstate.spool2) _bt_spooldestroy(buildstate.spool2); if (buildstate.btleader) _bt_end_parallel(buildstate.btleader); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); result = palloc_object(IndexBuildResult); @@ -488,8 +486,7 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate, reltuples = _bt_parallel_heapscan(buildstate, &indexInfo->ii_BrokenHotChain); InvalidateCatalogSnapshot(); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* * Set the progress target for the next phase. Reset the block number @@ -1421,6 +1418,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool reset_snapshot; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -1438,12 +1436,21 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) scantuplesortstates = leaderparticipates ? request + 1 : request; + /* + * For concurrent non-unique index builds, we can periodically reset snapshots + * to allow the xmin horizon to advance. This is safe since these builds don't + * require a consistent view across the entire scan. Unique indexes still need + * a stable snapshot to properly enforce uniqueness constraints. + */ + reset_snapshot = isconcurrent && !btspool->isunique; + /* * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * live according to that, while that snapshot may be reset periodically in + * case of non-unique index. */ if (!isconcurrent) { @@ -1451,6 +1458,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) snapshot = SnapshotAny; need_pop_active_snapshot = false; } + else if (reset_snapshot) + { + snapshot = InvalidSnapshot; + PushActiveSnapshot(GetTransactionSnapshot()); + } else { snapshot = RegisterSnapshot(GetTransactionSnapshot()); @@ -1511,7 +1523,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); @@ -1538,7 +1550,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btshared->brokenhotchain = false; table_parallelscan_initialize(btspool->heap, ParallelTableScanFromBTShared(btshared), - snapshot); + snapshot, + reset_snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1614,6 +1627,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Save leader state now that it's clear build will be parallel */ buildstate->btleader = btleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * Wait until all workers imported initial snapshot. + */ + if (reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _bt_leader_participate_as_worker(buildstate); @@ -1622,9 +1642,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + + InvalidateCatalogSnapshot(); + Assert(!reset_snapshot|| !TransactionIdIsValid(MyProc->xmin)); } /* @@ -1646,7 +1670,7 @@ _bt_end_parallel(BTLeader *btleader) InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) + if (btleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(btleader->snapshot)) UnregisterSnapshot(btleader->snapshot); DestroyParallelContext(btleader->pcxt); ExitParallelMode(); @@ -1896,6 +1920,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, SortCoordinate coordinate; BTBuildState buildstate; TableScanDesc scan; + ParallelTableScanDesc pscan; double reltuples; IndexInfo *indexInfo; @@ -1950,11 +1975,15 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); indexInfo->ii_Concurrent = btshared->isconcurrent; - scan = table_beginscan_parallel(btspool->heap, - ParallelTableScanFromBTShared(btshared)); + pscan = ParallelTableScanFromBTShared(btshared); + scan = table_beginscan_parallel(btspool->heap, pscan); reltuples = table_index_build_scan(btspool->heap, btspool->index, indexInfo, true, progress, _bt_build_callback, &buildstate, scan); + InvalidateCatalogSnapshot(); + if (pscan->phs_reset_snapshot) + PopActiveSnapshot(); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); /* Execute this worker's part of the sort */ if (progress) @@ -1990,4 +2019,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, tuplesort_end(btspool->sortstate); if (btspool2) tuplesort_end(btspool2->sortstate); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); + if (pscan->phs_reset_snapshot) + PushActiveSnapshot(GetTransactionSnapshot()); } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index dfda1af412e..26df5638921 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -132,10 +132,10 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) { Size sz = 0; - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) sz = add_size(sz, EstimateSnapshotSpace(snapshot)); else - Assert(snapshot == SnapshotAny); + Assert(snapshot == SnapshotAny || snapshot == InvalidSnapshot); sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); @@ -144,21 +144,36 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot) + Snapshot snapshot, bool reset_snapshot) { Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); pscan->phs_snapshot_off = snapshot_off; - if (IsMVCCSnapshot(snapshot)) + /* + * Initialize parallel scan description. For normal scans with a regular + * MVCC snapshot, serialize the snapshot info. For scans that use periodic + * snapshot resets, mark the scan accordingly. + */ + if (reset_snapshot) + { + Assert(snapshot == InvalidSnapshot); + pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = true; + INJECTION_POINT("table_parallelscan_initialize", NULL); + } + else if (IsMVCCSnapshot(snapshot)) { SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = false; } else { Assert(snapshot == SnapshotAny); + Assert(!reset_snapshot); pscan->phs_snapshot_any = true; + pscan->phs_reset_snapshot = false; } } @@ -171,7 +186,19 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan) Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator)); - if (!pscan->phs_snapshot_any) + /* + * For scans that + * use periodic snapshot resets, mark the scan accordingly and use the active + * snapshot as the initial state. + */ + if (pscan->phs_reset_snapshot) + { + Assert(ActiveSnapshotSet()); + flags |= SO_RESET_SNAPSHOT; + /* Start with current active snapshot. */ + snapshot = GetActiveSnapshot(); + } + else if (!pscan->phs_snapshot_any) { /* Snapshot was serialized -- restore it */ snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index ab1dfb30e73..0498c07c37b 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -79,6 +79,7 @@ #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D) #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E) #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F) +#define PARALLEL_KEY_SNAPSHOT_RESTORED UINT64CONST(0xFFFFFFFFFFFF0010) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -308,6 +309,10 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(bool), + pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate how much we'll need for the entrypoint info. */ shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + strlen(pcxt->function_name) + 2); @@ -379,6 +384,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *entrypointstate; char *uncommittedenumsspace; char *clientconninfospace; + bool *snapshot_set_flag_space; Size lnamelen; /* Serialize shared libraries we have loaded. */ @@ -494,6 +500,19 @@ InitializeParallelDSM(ParallelContext *pcxt) strcpy(entrypointstate, pcxt->library_name); strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); + + /* + * Establish dynamic shared memory to pass information about importing + * of snapshot. + */ + snapshot_set_flag_space = + shm_toc_allocate(pcxt->toc, mul_size(sizeof(bool), pcxt->nworkers)); + for (i = 0; i < pcxt->nworkers; ++i) + { + pcxt->worker[i].snapshot_restored = &snapshot_set_flag_space[i]; + *pcxt->worker[i].snapshot_restored = false; + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, snapshot_set_flag_space); } /* Update nworkers_to_launch, in case we changed nworkers above. */ @@ -670,6 +689,10 @@ LaunchParallelWorkers(ParallelContext *pcxt) * Wait for all workers to attach to their error queues, and throw an error if * any worker fails to do this. * + * wait_for_snapshot: track whether each parallel worker has successfully restored + * its snapshot. This is needed when using periodic snapshot resets to ensure all + * workers have a valid initial snapshot before proceeding with the scan. + * * Callers can assume that if this function returns successfully, then the * number of workers given by pcxt->nworkers_launched have initialized and * attached to their error queues. Whether or not these workers are guaranteed @@ -699,7 +722,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) * call this function at all. */ void -WaitForParallelWorkersToAttach(ParallelContext *pcxt) +WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot) { int i; @@ -743,9 +766,12 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); if (shm_mq_get_sender(mq) != NULL) { - /* Yes, so it is known to be attached. */ - pcxt->known_attached_workers[i] = true; - ++pcxt->nknown_attached_workers; + if (!wait_for_snapshot || *(pcxt->worker[i].snapshot_restored)) + { + /* Yes, so it is known to be attached. */ + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + } } } else if (status == BGWH_STOPPED) @@ -788,6 +814,16 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) break; } } + + /* Set snapshot restored flag to false. */ + if (pcxt->nworkers > 0) + { + bool *snapshot_restored_space; + snapshot_restored_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + for (i = 0; i < pcxt->nworkers; ++i) + snapshot_restored_space[i] = false; + } } /* @@ -1304,6 +1340,7 @@ ParallelWorkerMain(Datum main_arg) shm_toc *toc; FixedParallelState *fps; char *error_queue_space; + bool *snapshot_restored_space; shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; @@ -1507,6 +1544,10 @@ ParallelWorkerMain(Datum main_arg) fps->parallel_leader_pgproc); PushActiveSnapshot(asnapshot); + /* Snapshot is restored, set flag to make leader know about it. */ + snapshot_restored_space = shm_toc_lookup(toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + snapshot_restored_space[ParallelWorkerNumber] = true; + /* * We've changed which tuples we can see, and must therefore invalidate * system caches. diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 2ace60c1f07..b54921ad546 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1531,7 +1531,7 @@ index_concurrently_build(Oid heapRelationId, index_build(heapRel, indexRelation, indexInfo, false, true); InvalidateCatalogSnapshot(); - Assert((indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique) || !TransactionIdIsValid(MyProc->xmin)); + Assert(indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* Roll back any GUC changes executed by index functions */ AtEOXact_GUC(false, save_nestlevel); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index af3c788ce8b..01f9f330eee 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -371,7 +371,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node, pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel(node->ss.ss_currentRelation, pscan); diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c index 503817da65b..f31667117ad 100644 --- a/src/backend/executor/nodeTidrangescan.c +++ b/src/backend/executor/nodeTidrangescan.c @@ -455,7 +455,8 @@ ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt) pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 2e6197f5f35..df9116532c0 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -353,14 +353,6 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { - /* - * We might be able to relax this, but nothing that could otherwise work - * needs it. - */ - if (IsInParallelMode()) - elog(ERROR, - "cannot update SecondarySnapshot during a parallel operation"); - /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 60f857675e0..9a3e14e0c0e 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -28,6 +28,7 @@ typedef struct ParallelWorkerInfo { BackgroundWorkerHandle *bgwhandle; shm_mq_handle *error_mqh; + bool *snapshot_restored; } ParallelWorkerInfo; typedef struct ParallelContext @@ -67,7 +68,7 @@ extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch); extern void LaunchParallelWorkers(ParallelContext *pcxt); -extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot); extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index ce340c076f8..acfa06aed78 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -81,6 +81,7 @@ typedef struct ParallelTableScanDescData RelFileLocator phs_locator; /* physical relation to scan */ bool phs_syncscan; /* report location to syncscan logic? */ bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */ + bool phs_reset_snapshot; /* use SO_RESET_SNAPSHOT? */ Size phs_snapshot_off; /* data for snapshot */ } ParallelTableScanDescData; typedef struct ParallelTableScanDescData *ParallelTableScanDesc; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index d0709782dde..84b06ffa42f 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1151,7 +1151,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot); */ extern void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot); + Snapshot snapshot, + bool reset_snapshot); /* * Begin a parallel scan. `pscan` needs to have been initialized with @@ -1783,7 +1784,7 @@ table_scan_analyze_next_tuple(TableScanDesc scan, * This only really makes sense for heap AM, it might need to be generalized * for other AMs later. * - * In case of non-unique index and non-parallel concurrent build, + * In case of non-unique concurrent index build, * SO_RESET_SNAPSHOT is applied for the scan. That leads to changing snapshots * on the fly to allow xmin horizon propagate. */ diff --git a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql index 7f7dffa5be4..37819bf0fb7 100644 --- a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql +++ b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql @@ -3,7 +3,7 @@ CREATE EXTENSION injection_points; SELECT injection_points_set_local(); SELECT injection_points_attach('heap_reset_scan_snapshot_effective', 'notice'); SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice'); - +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); @@ -53,6 +53,9 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- 2.43.0