From cce4851fd6f42f6023e522029d787ed42379780d Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu Date: Wed, 14 Jan 2026 18:45:56 +0300 Subject: [PATCH v1 3/4] Support snapshot resets in parallel concurrent index builds Extend periodic snapshot reset support to parallel builds, previously limited to non-parallel operations. This allows the xmin horizon to advance during parallel concurrent index builds as well. The main limitation of applying that technique to parallel builds was a requirement to wait until worker processes restore their initial snapshot from leader. To address this, following changes applied: - add infrastructure to track snapshot restoration in parallel workers - extend parallel scan initialization to support periodic snapshot resets - wait for parallel workers to restore their initial snapshots before proceeding with scan - relax limitation for parallel worker to call GetLatestSnapshot --- src/backend/access/brin/brin.c | 54 ++++++++++------- src/backend/access/gin/gininsert.c | 51 +++++++++------- src/backend/access/heap/heapam_handler.c | 12 ++-- src/backend/access/nbtree/nbtsort.c | 60 ++++++++++++++----- src/backend/access/table/tableam.c | 37 ++++++++++-- src/backend/access/transam/parallel.c | 49 +++++++++++++-- src/backend/catalog/index.c | 2 +- src/backend/executor/nodeSeqscan.c | 3 +- src/backend/executor/nodeTidrangescan.c | 3 +- src/backend/utils/time/snapmgr.c | 8 --- src/include/access/parallel.h | 3 +- src/include/access/relscan.h | 1 + src/include/access/tableam.h | 5 +- .../sql/cic_reset_snapshots.sql | 5 +- 14 files changed, 208 insertions(+), 85 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 42844b2ebd..6e43db0bdf 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -143,7 +143,6 @@ typedef struct BrinLeader */ BrinShared *brinshared; Sharedsort *sharedsort; - Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; } BrinLeader; @@ -231,7 +230,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); -static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static Size _brin_parallel_estimate_shared(Relation heap); static double _brin_parallel_heapscan(BrinBuildState *state); static double _brin_parallel_merge(BrinBuildState *state); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, @@ -1222,7 +1221,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) reltuples = _brin_parallel_merge(state); _brin_end_parallel(state->bs_leader, state); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } else /* no parallel index build */ { @@ -1255,7 +1253,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) brin_fill_empty_ranges(state, state->bs_currRangeStart, state->bs_maxRangeStart); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } /* release resources */ @@ -1271,6 +1268,9 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = idxtuples; + InvalidateCatalogSnapshot(); + Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + return result; } @@ -2383,7 +2383,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { ParallelContext *pcxt; int scantuplesortstates; - Snapshot snapshot; Size estbrinshared; Size estsort; BrinShared *brinshared; @@ -2414,25 +2413,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot is reset periodically. */ if (!isconcurrent) { Assert(ActiveSnapshotSet()); - snapshot = SnapshotAny; need_pop_active_snapshot = false; } else { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + Assert(!ActiveSnapshotSet()); PushActiveSnapshot(GetTransactionSnapshot()); } /* * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. */ - estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + estbrinshared = _brin_parallel_estimate_shared(heap); shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -2472,8 +2471,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) - UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; @@ -2498,7 +2495,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromBrinShared(brinshared), - snapshot); + isconcurrent ? InvalidSnapshot : SnapshotAny, + isconcurrent); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2544,7 +2542,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->nparticipanttuplesorts++; brinleader->brinshared = brinshared; brinleader->sharedsort = sharedsort; - brinleader->snapshot = snapshot; brinleader->walusage = walusage; brinleader->bufferusage = bufferusage; @@ -2560,6 +2557,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = brinleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * We need to wait until all workers imported initial snapshot. + */ + if (isconcurrent) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _brin_leader_participate_as_worker(buildstate, heap, index); @@ -2568,9 +2572,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!isconcurrent) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + + InvalidateCatalogSnapshot(); + Assert(!brinleader->brinshared->isconcurrent || !TransactionIdIsValid(MyProc->xmin)); } /* @@ -2591,9 +2599,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); DestroyParallelContext(brinleader->pcxt); ExitParallelMode(); } @@ -2793,14 +2798,14 @@ _brin_parallel_merge(BrinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. + * brin index build. */ static Size -_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +_brin_parallel_estimate_shared(Relation heap) { /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ return add_size(BUFFERALIGN(sizeof(BrinShared)), - table_parallelscan_estimate(heap, snapshot)); + table_parallelscan_estimate(heap, InvalidSnapshot)); } /* @@ -2962,6 +2967,13 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort, heapRel, indexRel, sortmem, false); + if (brinshared->isconcurrent) + { + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xmin)); + PushActiveSnapshot(GetTransactionSnapshot()); + } /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 653ca50b96..3aa015f06a 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -133,7 +133,6 @@ typedef struct GinLeader */ GinBuildShared *ginshared; Sharedsort *sharedsort; - Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; } GinLeader; @@ -183,7 +182,7 @@ typedef struct static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); -static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static Size _gin_parallel_estimate_shared(Relation heap); static double _gin_parallel_heapscan(GinBuildState *state); static double _gin_parallel_merge(GinBuildState *state); static void _gin_leader_participate_as_worker(GinBuildState *buildstate, @@ -750,7 +749,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) reltuples = _gin_parallel_merge(state); _gin_end_parallel(state->bs_leader, state); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } else /* no parallel index build */ { @@ -774,7 +772,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) list, nlist, &buildstate.buildStats); } MemoryContextSwitchTo(oldCtx); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } MemoryContextDelete(buildstate.funcCtx); @@ -804,6 +801,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = buildstate.indtuples; + Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); return result; } @@ -938,7 +936,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, { ParallelContext *pcxt; int scantuplesortstates; - Snapshot snapshot; Size estginshared; Size estsort; GinBuildShared *ginshared; @@ -968,25 +965,25 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot is reset periodically. */ if (!isconcurrent) { Assert(ActiveSnapshotSet()); - snapshot = SnapshotAny; need_pop_active_snapshot = false; } else { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + Assert(!ActiveSnapshotSet()); PushActiveSnapshot(GetTransactionSnapshot()); } /* * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. */ - estginshared = _gin_parallel_estimate_shared(heap, snapshot); + estginshared = _gin_parallel_estimate_shared(heap); shm_toc_estimate_chunk(&pcxt->estimator, estginshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1026,8 +1023,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) - UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; @@ -1051,7 +1046,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromGinBuildShared(ginshared), - snapshot); + isconcurrent ? InvalidSnapshot : SnapshotAny, + isconcurrent); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1093,7 +1089,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, ginleader->nparticipanttuplesorts++; ginleader->ginshared = ginshared; ginleader->sharedsort = sharedsort; - ginleader->snapshot = snapshot; ginleader->walusage = walusage; ginleader->bufferusage = bufferusage; @@ -1109,6 +1104,13 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = ginleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * We need to wait until all workers imported initial snapshot. + */ + if (isconcurrent) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _gin_leader_participate_as_worker(buildstate, heap, index); @@ -1117,9 +1119,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!isconcurrent) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!ginleader->ginshared->isconcurrent || !TransactionIdIsValid(MyProc->xmin)); } /* @@ -1140,9 +1145,6 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(ginleader->snapshot)) - UnregisterSnapshot(ginleader->snapshot); DestroyParallelContext(ginleader->pcxt); ExitParallelMode(); } @@ -1817,14 +1819,14 @@ _gin_parallel_merge(GinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * gin index build based on the snapshot its parallel scan will use. + * gin index build. */ static Size -_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +_gin_parallel_estimate_shared(Relation heap) { /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ return add_size(BUFFERALIGN(sizeof(GinBuildShared)), - table_parallelscan_estimate(heap, snapshot)); + table_parallelscan_estimate(heap, InvalidSnapshot)); } /* @@ -2209,6 +2211,13 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); + if (ginshared->isconcurrent) + { + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xmin)); + PushActiveSnapshot(GetTransactionSnapshot()); + } /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 8724413e51..c5156d535a 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1235,14 +1235,13 @@ heapam_index_build_range_scan(Relation heapRelation, * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, or during bootstrap, we take a regular MVCC snapshot - * and index whatever's live according to that. + * and index whatever's live according to that while that snapshot is reset + * every so often (in case of non-unique index). */ OldestXmin = InvalidTransactionId; /* * For unique index we need consistent snapshot for the whole scan. - * In case of parallel scan some additional infrastructure required - * to perform scan with SO_RESET_SNAPSHOT which is not yet ready. */ reset_snapshots = indexInfo->ii_Concurrent && !indexInfo->ii_Unique && @@ -1304,8 +1303,11 @@ heapam_index_build_range_scan(Relation heapRelation, Assert(!IsBootstrapProcessingMode()); Assert(allow_sync); snapshot = scan->rs_snapshot; - PushActiveSnapshot(snapshot); - need_pop_active_snapshot = true; + if (!reset_snapshots) + { + PushActiveSnapshot(snapshot); + need_pop_active_snapshot = true; + } } hscan = (HeapScanDesc) scan; diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index a1a5a1db9b..d55e205071 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -322,22 +322,20 @@ btbuild(Relation heap, Relation index, IndexInfo *indexInfo) RelationGetRelationName(index)); reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* * Finish the build by (1) completing the sort of the spool file, (2) * inserting the sorted tuples into btree pages and (3) building the upper * levels. Finally, it may also be necessary to end use of parallelism. */ - _bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_ParallelWorkers && indexInfo->ii_Concurrent); + _bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_Unique && indexInfo->ii_Concurrent); _bt_spooldestroy(buildstate.spool); if (buildstate.spool2) _bt_spooldestroy(buildstate.spool2); if (buildstate.btleader) _bt_end_parallel(buildstate.btleader); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); result = palloc_object(IndexBuildResult); @@ -486,8 +484,7 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate, reltuples = _bt_parallel_heapscan(buildstate, &indexInfo->ii_BrokenHotChain); InvalidateCatalogSnapshot(); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* * Set the progress target for the next phase. Reset the block number @@ -1419,6 +1416,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool reset_snapshot; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -1436,12 +1434,21 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) scantuplesortstates = leaderparticipates ? request + 1 : request; + /* + * For concurrent non-unique index builds, we can periodically reset snapshots + * to allow the xmin horizon to advance. This is safe since these builds don't + * require a consistent view across the entire scan. Unique indexes still need + * a stable snapshot to properly enforce uniqueness constraints. + */ + reset_snapshot = isconcurrent && !btspool->isunique; + /* * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * live according to that, while that snapshot may be reset periodically in + * case of non-unique index. */ if (!isconcurrent) { @@ -1449,6 +1456,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) snapshot = SnapshotAny; need_pop_active_snapshot = false; } + else if (reset_snapshot) + { + snapshot = InvalidSnapshot; + PushActiveSnapshot(GetTransactionSnapshot()); + } else { snapshot = RegisterSnapshot(GetTransactionSnapshot()); @@ -1509,7 +1521,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); @@ -1536,7 +1548,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btshared->brokenhotchain = false; table_parallelscan_initialize(btspool->heap, ParallelTableScanFromBTShared(btshared), - snapshot); + snapshot, + reset_snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1612,6 +1625,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Save leader state now that it's clear build will be parallel */ buildstate->btleader = btleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * Wait until all workers imported initial snapshot. + */ + if (reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _bt_leader_participate_as_worker(buildstate); @@ -1620,9 +1640,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); + + InvalidateCatalogSnapshot(); + Assert(!reset_snapshot|| !TransactionIdIsValid(MyProc->xmin)); } /* @@ -1644,7 +1668,7 @@ _bt_end_parallel(BTLeader *btleader) InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) + if (btleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(btleader->snapshot)) UnregisterSnapshot(btleader->snapshot); DestroyParallelContext(btleader->pcxt); ExitParallelMode(); @@ -1894,6 +1918,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, SortCoordinate coordinate; BTBuildState buildstate; TableScanDesc scan; + ParallelTableScanDesc pscan; double reltuples; IndexInfo *indexInfo; @@ -1948,11 +1973,15 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); indexInfo->ii_Concurrent = btshared->isconcurrent; - scan = table_beginscan_parallel(btspool->heap, - ParallelTableScanFromBTShared(btshared)); + pscan = ParallelTableScanFromBTShared(btshared); + scan = table_beginscan_parallel(btspool->heap, pscan); reltuples = table_index_build_scan(btspool->heap, btspool->index, indexInfo, true, progress, _bt_build_callback, &buildstate, scan); + InvalidateCatalogSnapshot(); + if (pscan->phs_reset_snapshot) + PopActiveSnapshot(); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); /* Execute this worker's part of the sort */ if (progress) @@ -1988,4 +2017,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, tuplesort_end(btspool->sortstate); if (btspool2) tuplesort_end(btspool2->sortstate); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); + if (pscan->phs_reset_snapshot) + PushActiveSnapshot(GetTransactionSnapshot()); } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 8749179652..91dcab3ff5 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -132,10 +132,10 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) { Size sz = 0; - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) sz = add_size(sz, EstimateSnapshotSpace(snapshot)); else - Assert(snapshot == SnapshotAny); + Assert(snapshot == SnapshotAny || snapshot == InvalidSnapshot); sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); @@ -144,21 +144,36 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot) + Snapshot snapshot, bool reset_snapshot) { Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); pscan->phs_snapshot_off = snapshot_off; - if (IsMVCCSnapshot(snapshot)) + /* + * Initialize parallel scan description. For normal scans with a regular + * MVCC snapshot, serialize the snapshot info. For scans that use periodic + * snapshot resets, mark the scan accordingly. + */ + if (reset_snapshot) + { + Assert(snapshot == InvalidSnapshot); + pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = true; + INJECTION_POINT("table_parallelscan_initialize", NULL); + } + else if (IsMVCCSnapshot(snapshot)) { SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = false; } else { Assert(snapshot == SnapshotAny); + Assert(!reset_snapshot); pscan->phs_snapshot_any = true; + pscan->phs_reset_snapshot = false; } } @@ -171,7 +186,19 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan) Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator)); - if (!pscan->phs_snapshot_any) + /* + * For scans that + * use periodic snapshot resets, mark the scan accordingly and use the active + * snapshot as the initial state. + */ + if (pscan->phs_reset_snapshot) + { + Assert(ActiveSnapshotSet()); + flags |= SO_RESET_SNAPSHOT; + /* Start with current active snapshot. */ + snapshot = GetActiveSnapshot(); + } + else if (!pscan->phs_snapshot_any) { /* Snapshot was serialized -- restore it */ snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 01a89104ef..824d16ce71 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -77,6 +77,7 @@ #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D) #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E) #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F) +#define PARALLEL_KEY_SNAPSHOT_RESTORED UINT64CONST(0xFFFFFFFFFFFF0010) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -306,6 +307,10 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(bool), + pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate how much we'll need for the entrypoint info. */ shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + strlen(pcxt->function_name) + 2); @@ -377,6 +382,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *entrypointstate; char *uncommittedenumsspace; char *clientconninfospace; + bool *snapshot_set_flag_space; Size lnamelen; /* Serialize shared libraries we have loaded. */ @@ -492,6 +498,19 @@ InitializeParallelDSM(ParallelContext *pcxt) strcpy(entrypointstate, pcxt->library_name); strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); + + /* + * Establish dynamic shared memory to pass information about importing + * of snapshot. + */ + snapshot_set_flag_space = + shm_toc_allocate(pcxt->toc, mul_size(sizeof(bool), pcxt->nworkers)); + for (i = 0; i < pcxt->nworkers; ++i) + { + pcxt->worker[i].snapshot_restored = &snapshot_set_flag_space[i]; + *pcxt->worker[i].snapshot_restored = false; + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, snapshot_set_flag_space); } /* Update nworkers_to_launch, in case we changed nworkers above. */ @@ -668,6 +687,10 @@ LaunchParallelWorkers(ParallelContext *pcxt) * Wait for all workers to attach to their error queues, and throw an error if * any worker fails to do this. * + * wait_for_snapshot: track whether each parallel worker has successfully restored + * its snapshot. This is needed when using periodic snapshot resets to ensure all + * workers have a valid initial snapshot before proceeding with the scan. + * * Callers can assume that if this function returns successfully, then the * number of workers given by pcxt->nworkers_launched have initialized and * attached to their error queues. Whether or not these workers are guaranteed @@ -697,7 +720,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) * call this function at all. */ void -WaitForParallelWorkersToAttach(ParallelContext *pcxt) +WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot) { int i; @@ -741,9 +764,12 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); if (shm_mq_get_sender(mq) != NULL) { - /* Yes, so it is known to be attached. */ - pcxt->known_attached_workers[i] = true; - ++pcxt->nknown_attached_workers; + if (!wait_for_snapshot || *(pcxt->worker[i].snapshot_restored)) + { + /* Yes, so it is known to be attached. */ + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + } } } else if (status == BGWH_STOPPED) @@ -786,6 +812,16 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) break; } } + + /* Set snapshot restored flag to false. */ + if (pcxt->nworkers > 0) + { + bool *snapshot_restored_space; + snapshot_restored_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + for (i = 0; i < pcxt->nworkers; ++i) + snapshot_restored_space[i] = false; + } } /* @@ -1302,6 +1338,7 @@ ParallelWorkerMain(Datum main_arg) shm_toc *toc; FixedParallelState *fps; char *error_queue_space; + bool *snapshot_restored_space; shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; @@ -1506,6 +1543,10 @@ ParallelWorkerMain(Datum main_arg) fps->parallel_leader_pgproc); PushActiveSnapshot(asnapshot); + /* Snapshot is restored, set flag to make leader know about it. */ + snapshot_restored_space = shm_toc_lookup(toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + snapshot_restored_space[ParallelWorkerNumber] = true; + /* * We've changed which tuples we can see, and must therefore invalidate * system caches. diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 2ace60c1f0..b54921ad54 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1531,7 +1531,7 @@ index_concurrently_build(Oid heapRelationId, index_build(heapRel, indexRelation, indexInfo, false, true); InvalidateCatalogSnapshot(); - Assert((indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique) || !TransactionIdIsValid(MyProc->xmin)); + Assert(indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* Roll back any GUC changes executed by index functions */ AtEOXact_GUC(false, save_nestlevel); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index b8119face4..acf7b77031 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -371,7 +371,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node, pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel(node->ss.ss_currentRelation, pscan); diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c index 4aa28918e9..80f3950eb4 100644 --- a/src/backend/executor/nodeTidrangescan.c +++ b/src/backend/executor/nodeTidrangescan.c @@ -455,7 +455,8 @@ ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt) pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 2e6197f5f3..df9116532c 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -353,14 +353,6 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { - /* - * We might be able to relax this, but nothing that could otherwise work - * needs it. - */ - if (IsInParallelMode()) - elog(ERROR, - "cannot update SecondarySnapshot during a parallel operation"); - /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 01bdf2bec1..66ea984a08 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -26,6 +26,7 @@ typedef struct ParallelWorkerInfo { BackgroundWorkerHandle *bgwhandle; shm_mq_handle *error_mqh; + bool *snapshot_restored; } ParallelWorkerInfo; typedef struct ParallelContext @@ -65,7 +66,7 @@ extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch); extern void LaunchParallelWorkers(ParallelContext *pcxt); -extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot); extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index ce340c076f..acfa06aed7 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -81,6 +81,7 @@ typedef struct ParallelTableScanDescData RelFileLocator phs_locator; /* physical relation to scan */ bool phs_syncscan; /* report location to syncscan logic? */ bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */ + bool phs_reset_snapshot; /* use SO_RESET_SNAPSHOT? */ Size phs_snapshot_off; /* data for snapshot */ } ParallelTableScanDescData; typedef struct ParallelTableScanDescData *ParallelTableScanDesc; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5ef8b431d2..bdadd511fe 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1140,7 +1140,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot); */ extern void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot); + Snapshot snapshot, + bool reset_snapshot); /* * Begin a parallel scan. `pscan` needs to have been initialized with @@ -1772,7 +1773,7 @@ table_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, * This only really makes sense for heap AM, it might need to be generalized * for other AMs later. * - * In case of non-unique index and non-parallel concurrent build, + * In case of non-unique concurrent index build, * SO_RESET_SNAPSHOT is applied for the scan. That leads to changing snapshots * on the fly to allow xmin horizon propagate. */ diff --git a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql index 7f7dffa5be..37819bf0fb 100644 --- a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql +++ b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql @@ -3,7 +3,7 @@ CREATE EXTENSION injection_points; SELECT injection_points_set_local(); SELECT injection_points_attach('heap_reset_scan_snapshot_effective', 'notice'); SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice'); - +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); @@ -53,6 +53,9 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- 2.43.0