Thread: why there is not VACUUM FULL CONCURRENTLY?
Hi
I have one question, what is a block of implementation of some variant of VACUUM FULL like REINDEX CONCURRENTLY? Why similar mechanism of REINDEX CONCURRENTLY cannot be used for VACUUM FULL?
Regards
Pavel
On Tue, Jan 30, 2024 at 09:01:57AM +0100, Pavel Stehule wrote: > I have one question, what is a block of implementation of some variant of > VACUUM FULL like REINDEX CONCURRENTLY? Why similar mechanism of REINDEX > CONCURRENTLY cannot be used for VACUUM FULL? You may be interested in these threads: https://www.postgresql.org/message-id/CAB7nPqTGmNUFi%2BW6F1iwmf7J-o6sY%2Bxxo6Yb%3DmkUVYT-CG-B5A%40mail.gmail.com https://www.postgresql.org/message-id/CAB7nPqTys6JUQDxUczbJb0BNW0kPrW8WdZuk11KaxQq6o98PJg%40mail.gmail.com VACUUM FULL is CLUSTER under the hoods. One may question whether it is still a relevant discussion these days if we assume that autovacuum is able to keep up, because it always keeps up with the house cleanup, right? ;) More seriously, we have a lot more options these days with VACUUM like PARALLEL, so CONCURRENTLY may still have some uses, but the new toys available may have changed things. So, would it be worth the complexities around heap manipulations that lower locks would require? -- Michael
Attachment
út 30. 1. 2024 v 9:14 odesílatel Michael Paquier <michael@paquier.xyz> napsal:
On Tue, Jan 30, 2024 at 09:01:57AM +0100, Pavel Stehule wrote:
> I have one question, what is a block of implementation of some variant of
> VACUUM FULL like REINDEX CONCURRENTLY? Why similar mechanism of REINDEX
> CONCURRENTLY cannot be used for VACUUM FULL?
You may be interested in these threads:
https://www.postgresql.org/message-id/CAB7nPqTGmNUFi%2BW6F1iwmf7J-o6sY%2Bxxo6Yb%3DmkUVYT-CG-B5A%40mail.gmail.com
https://www.postgresql.org/message-id/CAB7nPqTys6JUQDxUczbJb0BNW0kPrW8WdZuk11KaxQq6o98PJg%40mail.gmail.com
VACUUM FULL is CLUSTER under the hoods. One may question whether it
is still a relevant discussion these days if we assume that autovacuum
is able to keep up, because it always keeps up with the house cleanup,
right? ;)
More seriously, we have a lot more options these days with VACUUM like
PARALLEL, so CONCURRENTLY may still have some uses, but the new toys
available may have changed things. So, would it be worth the
complexities around heap manipulations that lower locks would require?
One of my customer today is reducing one table from 140GB to 20GB. Now he is able to run archiving. He should play with pg_repack, and it is working well today, but I ask myself, what pg_repack does not be hard to do internally because it should be done for REINDEX CONCURRENTLY. This is not a common task, and not will be, but on the other hand, it can be nice to have feature, and maybe not too hard to implement today. But I didn't try it
I'll read the threads
Pavel
--
Michael
On 2024-Jan-30, Pavel Stehule wrote: > One of my customer today is reducing one table from 140GB to 20GB. Now he > is able to run archiving. He should play with pg_repack, and it is working > well today, but I ask myself, what pg_repack does not be hard to do > internally because it should be done for REINDEX CONCURRENTLY. This is not > a common task, and not will be, but on the other hand, it can be nice to > have feature, and maybe not too hard to implement today. But I didn't try it FWIW a newer, more modern and more trustworthy alternative to pg_repack is pg_squeeze, which I discovered almost by random chance, and soon discovered I liked it much more. So thinking about your question, I think it might be possible to integrate a tool that works like pg_squeeze, such that it runs when VACUUM is invoked -- either under some new option, or just replace the code under FULL, not sure. If the Cybertec people allows it, we could just grab the pg_squeeze code and add it to the things that VACUUM can run. Now, pg_squeeze has some additional features, such as periodic "squeezing" of tables. In a first attempt, for simplicity, I would leave that stuff out and just allow it to run from the user invoking it, and then have the command to do a single run. (The scheduling features could be added later, or somehow integrated into autovacuum, or maybe something else.) -- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/ "We're here to devour each other alive" (Hobbes)
út 30. 1. 2024 v 11:31 odesílatel Alvaro Herrera <alvherre@alvh.no-ip.org> napsal:
On 2024-Jan-30, Pavel Stehule wrote:
> One of my customer today is reducing one table from 140GB to 20GB. Now he
> is able to run archiving. He should play with pg_repack, and it is working
> well today, but I ask myself, what pg_repack does not be hard to do
> internally because it should be done for REINDEX CONCURRENTLY. This is not
> a common task, and not will be, but on the other hand, it can be nice to
> have feature, and maybe not too hard to implement today. But I didn't try it
FWIW a newer, more modern and more trustworthy alternative to pg_repack
is pg_squeeze, which I discovered almost by random chance, and soon
discovered I liked it much more.
So thinking about your question, I think it might be possible to
integrate a tool that works like pg_squeeze, such that it runs when
VACUUM is invoked -- either under some new option, or just replace the
code under FULL, not sure. If the Cybertec people allows it, we could
just grab the pg_squeeze code and add it to the things that VACUUM can
run.
Now, pg_squeeze has some additional features, such as periodic
"squeezing" of tables. In a first attempt, for simplicity, I would
leave that stuff out and just allow it to run from the user invoking it,
and then have the command to do a single run. (The scheduling features
could be added later, or somehow integrated into autovacuum, or maybe
something else.)
some basic variant (without autovacuum support) can be good enough. We have no autovacuum support for REINDEX CONCURRENTLY and I don't see a necessity for it (sure, it can be limited by my perspective) . The necessity of reducing table size is not too common (a lot of use cases are better covered by using partitioning), but sometimes it is, and then buildin simple available solution can be helpful.
--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
"We're here to devour each other alive" (Hobbes)
On 2024-Jan-30, Pavel Stehule wrote: > some basic variant (without autovacuum support) can be good enough. We have > no autovacuum support for REINDEX CONCURRENTLY and I don't see a necessity > for it (sure, it can be limited by my perspective) . The necessity of > reducing table size is not too common (a lot of use cases are better > covered by using partitioning), but sometimes it is, and then buildin > simple available solution can be helpful. That's my thinking as well. -- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
On Tue, Jan 30, 2024 at 12:37:12PM +0100, Alvaro Herrera wrote: > On 2024-Jan-30, Pavel Stehule wrote: > > > some basic variant (without autovacuum support) can be good enough. We have > > no autovacuum support for REINDEX CONCURRENTLY and I don't see a necessity > > for it (sure, it can be limited by my perspective) . The necessity of > > reducing table size is not too common (a lot of use cases are better > > covered by using partitioning), but sometimes it is, and then buildin > > simple available solution can be helpful. > > That's my thinking as well. Or, yes, I'd agree about that. This can make for a much better user experience. I'm just not sure how that stuff would be shaped and how much ground it would need to cover. -- Michael
Attachment
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > On 2024-Jan-30, Pavel Stehule wrote: > > > One of my customer today is reducing one table from 140GB to 20GB. Now he > > is able to run archiving. He should play with pg_repack, and it is working > > well today, but I ask myself, what pg_repack does not be hard to do > > internally because it should be done for REINDEX CONCURRENTLY. This is not > > a common task, and not will be, but on the other hand, it can be nice to > > have feature, and maybe not too hard to implement today. But I didn't try it > > FWIW a newer, more modern and more trustworthy alternative to pg_repack > is pg_squeeze, which I discovered almost by random chance, and soon > discovered I liked it much more. > > So thinking about your question, I think it might be possible to > integrate a tool that works like pg_squeeze, such that it runs when > VACUUM is invoked -- either under some new option, or just replace the > code under FULL, not sure. If the Cybertec people allows it, we could > just grab the pg_squeeze code and add it to the things that VACUUM can > run. There are no objections from Cybertec. Nevertheless, I don't expect much code to be just copy & pasted. If I started to implement the extension today, I'd do some things in a different way. (Some things might actually be simpler in the core, i.e. a few small changes in PG core are easier than the related workarounds in the extension.) The core idea is that: 1) a "historic snapshot" is used to get the current contents of the table, 2) logical decoding is used to capture the changes done while the data is being copied to new storage, 3) the exclusive lock on the table is only taken for very short time, to swap the storage (relfilenode) of the table. I think it should be coded in a way that allows use by VACUUM FULL, CLUSTER, and possibly some subcommands of ALTER TABLE. For example, some users of pg_squeeze requested an enhancement that allows the user to change column data type w/o service disruption (typically when it appears that integer type is going to overflow and change bigint is needed). Online (re)partitioning could be another use case, although I admit that commands that change the system catalog are a bit harder to implement than VACUUM FULL / CLUSTER. One thing that pg_squeeze does not handle is visibility: it uses heap_insert() to insert the tuples into the new storage, so the problems described in [1] can appear. The in-core implementation should rather do something like tuple rewriting (rewriteheap.c). Is your plan to work on it soon or should I try to write a draft patch? (I assume this is for PG >= 18.) [1] https://www.postgresql.org/docs/current/mvcc-caveats.html -- Antonin Houska Web: https://www.cybertec-postgresql.com
This is great to hear. On 2024-Jan-31, Antonin Houska wrote: > Is your plan to work on it soon or should I try to write a draft patch? (I > assume this is for PG >= 18.) I don't have plans for it, so if you have resources, please go for it. -- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > This is great to hear. > > On 2024-Jan-31, Antonin Houska wrote: > > > Is your plan to work on it soon or should I try to write a draft patch? (I > > assume this is for PG >= 18.) > > I don't have plans for it, so if you have resources, please go for it. ok, I'm thinking how can the feature be integrated into the core. BTW, I'm failing to understand why cluster_rel() has no argument of the BufferAccessStrategy type. According to buffer/README, the criterion for using specific strategy is that page "is unlikely to be needed again soon". Specifically for cluster_rel(), the page will *definitely* not be used again (unless the VACCUM FULL/CLUSTER command fails): BufferTag contains the relatin file number and the old relation file is eventually dropped. Am I missing anything? -- Antonin Houska Web: https://www.cybertec-postgresql.com
On 2024-Feb-16, Antonin Houska wrote: > BTW, I'm failing to understand why cluster_rel() has no argument of the > BufferAccessStrategy type. According to buffer/README, the criterion for using > specific strategy is that page "is unlikely to be needed again > soon". Specifically for cluster_rel(), the page will *definitely* not be used > again (unless the VACCUM FULL/CLUSTER command fails): BufferTag contains the > relatin file number and the old relation file is eventually dropped. > > Am I missing anything? No, that's just an oversight. Access strategies are newer than that cluster code. -- Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/ "Most hackers will be perfectly comfortable conceptualizing users as entropy sources, so let's move on." (Nathaniel Smith) https://mail.gnu.org/archive/html/monotone-devel/2007-01/msg00080.html
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > > Is your plan to work on it soon or should I try to write a draft patch? (I > > assume this is for PG >= 18.) > > I don't have plans for it, so if you have resources, please go for it. The first version is attached. The actual feature is in 0003. 0004 is probably not necessary now, but I haven't realized until I coded it. -- Antonin Houska Web: https://www.cybertec-postgresql.com From f47a98b9b4580a581aacf73c553b87ca6bf16533 Mon Sep 17 00:00:00 2001 From: Antonin Houska <ah@cybertec.at> Date: Tue, 9 Jul 2024 17:45:59 +0200 Subject: [PATCH 1/4] Adjust signature of cluster_rel() and its subroutines. So far cluster_rel() received OID of the relation it should process and it performed opening and locking of the relation itself. Yet copy_table_data() received the OID as well and also had to open the relation itself. This patch tries to eliminate the repeated opening and closing. One particular reason for this change is that the VACUUM FULL / CLUSTER command with the CONCURRENTLY option will need to release all locks on the relation (and possibly on the clustering index) at some point. Since it makes little sense to keep relation reference w/o lock, the cluster_rel() function also closes its reference to the relation (and its index). Neither the function nor its subroutines may open extra references because then it'd be a bit harder to close them all. --- src/backend/commands/cluster.c | 146 ++++++++++++++++++------------- src/backend/commands/matview.c | 2 +- src/backend/commands/tablecmds.c | 2 +- src/backend/commands/vacuum.c | 12 +-- src/include/commands/cluster.h | 5 +- 5 files changed, 99 insertions(+), 68 deletions(-) diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 78f96789b0..194d143cf4 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -70,8 +70,8 @@ typedef struct static void cluster_multiple_rels(List *rtcs, ClusterParams *params); -static void rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose); -static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, +static void rebuild_relation(Relation OldHeap, Relation index, bool verbose); +static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, MultiXactId *pCutoffMulti); static List *get_tables_to_cluster(MemoryContext cluster_context); @@ -194,11 +194,11 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { - /* close relation, keep lock till commit */ - table_close(rel, NoLock); - - /* Do the job. */ - cluster_rel(tableOid, indexOid, ¶ms); + /* + * Do the job. (The function will close the relation, lock is kept + * till commit.) + */ + cluster_rel(rel, indexOid, ¶ms); return; } @@ -275,6 +275,7 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params) foreach(lc, rtcs) { RelToCluster *rtc = (RelToCluster *) lfirst(lc); + Relation rel; /* Start a new transaction for each relation. */ StartTransactionCommand(); @@ -282,8 +283,13 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params) /* functions in indexes may want a snapshot set */ PushActiveSnapshot(GetTransactionSnapshot()); - /* Do the job. */ - cluster_rel(rtc->tableOid, rtc->indexOid, params); + rel = table_open(rtc->tableOid, AccessExclusiveLock); + + /* + * Do the job. (The function will close the relation, lock is kept + * till commit.) + */ + cluster_rel(rel, rtc->indexOid, params); PopActiveSnapshot(); CommitTransactionCommand(); @@ -306,16 +312,19 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params) * If indexOid is InvalidOid, the table will be rewritten in physical order * instead of index order. This is the new implementation of VACUUM FULL, * and error messages should refer to the operation as VACUUM not CLUSTER. + * + * We expect that OldHeap is already locked in AccessExclusiveLock mode. */ void -cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params) +cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) { - Relation OldHeap; + Oid tableOid = RelationGetRelid(OldHeap); Oid save_userid; int save_sec_context; int save_nestlevel; bool verbose = ((params->options & CLUOPT_VERBOSE) != 0); bool recheck = ((params->options & CLUOPT_RECHECK) != 0); + Relation index = NULL; /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); @@ -328,21 +337,6 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params) pgstat_progress_update_param(PROGRESS_CLUSTER_COMMAND, PROGRESS_CLUSTER_COMMAND_VACUUM_FULL); - /* - * We grab exclusive access to the target rel and index for the duration - * of the transaction. (This is redundant for the single-transaction - * case, since cluster() already did it.) The index lock is taken inside - * check_index_is_clusterable. - */ - OldHeap = try_relation_open(tableOid, AccessExclusiveLock); - - /* If the table has gone away, we can skip processing it */ - if (!OldHeap) - { - pgstat_progress_end_command(); - return; - } - /* * Switch to the table owner's userid, so that any index functions are run * as that user. Also lock down security-restricted operations and @@ -445,7 +439,11 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params) /* Check heap and index are valid to cluster on */ if (OidIsValid(indexOid)) + { check_index_is_clusterable(OldHeap, indexOid, AccessExclusiveLock); + /* Open the index (It should already be locked.) */ + index = index_open(indexOid, NoLock); + } /* * Quietly ignore the request if this is a materialized view which has not @@ -474,9 +472,12 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params) TransferPredicateLocksToHeapRelation(OldHeap); /* rebuild_relation does all the dirty work */ - rebuild_relation(OldHeap, indexOid, verbose); + rebuild_relation(OldHeap, index, verbose); - /* NB: rebuild_relation does table_close() on OldHeap */ + /* + * NB: rebuild_relation does table_close() on OldHeap, and also on index, + * if the pointer is valid. + */ out: /* Roll back any GUC changes executed by index functions */ @@ -625,22 +626,27 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal) * rebuild_relation: rebuild an existing relation in index or physical order * * OldHeap: table to rebuild --- must be opened and exclusive-locked! - * indexOid: index to cluster by, or InvalidOid to rewrite in physical order. + * index: index to cluster by, or NULL to rewrite in physical order. Must be + * opened and locked. * - * NB: this routine closes OldHeap at the right time; caller should not. + * On exit, the heap (and also the index, if one was passed) are closed, but + * still locked with AccessExclusiveLock. */ static void -rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose) +rebuild_relation(Relation OldHeap, Relation index, bool verbose) { Oid tableOid = RelationGetRelid(OldHeap); + Oid indexOid = index ? RelationGetRelid(index) : InvalidOid; Oid accessMethod = OldHeap->rd_rel->relam; Oid tableSpace = OldHeap->rd_rel->reltablespace; Oid OIDNewHeap; + Relation NewHeap; char relpersistence; bool is_system_catalog; bool swap_toast_by_content; TransactionId frozenXid; MultiXactId cutoffMulti; + LOCKMODE lmode_new; if (OidIsValid(indexOid)) /* Mark the correct index as clustered */ @@ -650,19 +656,40 @@ rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose) relpersistence = OldHeap->rd_rel->relpersistence; is_system_catalog = IsSystemRelation(OldHeap); - /* Close relcache entry, but keep lock until transaction commit */ - table_close(OldHeap, NoLock); - - /* Create the transient table that will receive the re-ordered data */ + /* + * Create the transient table that will receive the re-ordered data. + * + * NoLock for the old heap because we already have it locked and want to + * keep unlocking straightforward. + */ + lmode_new = AccessExclusiveLock; OIDNewHeap = make_new_heap(tableOid, tableSpace, accessMethod, relpersistence, - AccessExclusiveLock); + NoLock, &lmode_new); + Assert(lmode_new == AccessExclusiveLock || lmode_new == NoLock); + /* Lock iff not done above. */ + NewHeap = table_open(OIDNewHeap, lmode_new == NoLock ? + AccessExclusiveLock : NoLock); /* Copy the heap data into the new table in the desired order */ - copy_table_data(OIDNewHeap, tableOid, indexOid, verbose, + copy_table_data(NewHeap, OldHeap, index, verbose, &swap_toast_by_content, &frozenXid, &cutoffMulti); + + /* Close relcache entries, but keep lock until transaction commit */ + table_close(OldHeap, NoLock); + if (index) + index_close(index, NoLock); + + /* + * Close the new relation so it can be dropped as soon as the storage is + * swapped. The relation is not visible to others, so we could unlock it + * completely, but it's simpler to pass NoLock than to track all the locks + * acquired so far. + */ + table_close(NewHeap, NoLock); + /* * Swap the physical files of the target and transient tables, then * rebuild the target's indexes and throw away the transient table. @@ -683,10 +710,15 @@ rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose) * * After this, the caller should load the new heap with transferred/modified * data, then call finish_heap_swap to complete the operation. + * + * If a specific lock mode is needed for the new relation, pass it via the + * in/out parameter lockmode_new_p. On exit, the output value tells whether + * the lock was actually acquired. */ Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, - char relpersistence, LOCKMODE lockmode) + char relpersistence, LOCKMODE lockmode_old, + LOCKMODE *lockmode_new_p) { TupleDesc OldHeapDesc; char NewHeapName[NAMEDATALEN]; @@ -697,8 +729,17 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, Datum reloptions; bool isNull; Oid namespaceid; + LOCKMODE lockmode_new; - OldHeap = table_open(OIDOldHeap, lockmode); + if (lockmode_new_p) + { + lockmode_new = *lockmode_new_p; + *lockmode_new_p = NoLock; + } + else + lockmode_new = lockmode_old; + + OldHeap = table_open(OIDOldHeap, lockmode_old); OldHeapDesc = RelationGetDescr(OldHeap); /* @@ -792,7 +833,9 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, if (isNull) reloptions = (Datum) 0; - NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid); + NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode_new, toastid); + if (lockmode_new_p) + *lockmode_new_p = lockmode_new; ReleaseSysCache(tuple); } @@ -811,13 +854,13 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, * *pCutoffMulti receives the MultiXactId used as a cutoff point. */ static void -copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose, +copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, MultiXactId *pCutoffMulti) { - Relation NewHeap, - OldHeap, - OldIndex; + Oid OIDOldHeap = RelationGetRelid(OldHeap); + Oid OIDOldIndex = OldIndex ? RelationGetRelid(OldIndex) : InvalidOid; + Oid OIDNewHeap = RelationGetRelid(NewHeap); Relation relRelation; HeapTuple reltup; Form_pg_class relform; @@ -836,16 +879,6 @@ copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose, pg_rusage_init(&ru0); - /* - * Open the relations we need. - */ - NewHeap = table_open(OIDNewHeap, AccessExclusiveLock); - OldHeap = table_open(OIDOldHeap, AccessExclusiveLock); - if (OidIsValid(OIDOldIndex)) - OldIndex = index_open(OIDOldIndex, AccessExclusiveLock); - else - OldIndex = NULL; - /* Store a copy of the namespace name for logging purposes */ nspname = get_namespace_name(RelationGetNamespace(OldHeap)); @@ -1001,11 +1034,6 @@ copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose, tups_recently_dead, pg_rusage_show(&ru0)))); - if (OldIndex != NULL) - index_close(OldIndex, NoLock); - table_close(OldHeap, NoLock); - table_close(NewHeap, NoLock); - /* Update pg_class to reflect the correct values of pages and tuples. */ relRelation = table_open(RelationRelationId, RowExclusiveLock); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index ea05d4b224..488ca950d9 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -296,7 +296,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ OIDNewHeap = make_new_heap(matviewOid, tableSpace, matviewRel->rd_rel->relam, - relpersistence, ExclusiveLock); + relpersistence, ExclusiveLock, NULL); LockRelationOid(OIDNewHeap, AccessExclusiveLock); dest = CreateTransientRelDestReceiver(OIDNewHeap); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index dbfe0d6b1c..5d6151dad1 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -5841,7 +5841,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode, * unlogged anyway. */ OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod, - persistence, lockmode); + persistence, lockmode, NULL); /* * Copy the heap data into the new table with the desired diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 48f8eab202..0bd000acc5 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -2196,15 +2196,17 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, { ClusterParams cluster_params = {0}; - /* close relation before vacuuming, but hold lock until commit */ - relation_close(rel, NoLock); - rel = NULL; - if ((params->options & VACOPT_VERBOSE) != 0) cluster_params.options |= CLUOPT_VERBOSE; /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */ - cluster_rel(relid, InvalidOid, &cluster_params); + cluster_rel(rel, InvalidOid, &cluster_params); + + /* + * cluster_rel() should have closed the relation, lock is kept + * till commit. + */ + rel = NULL; } else table_relation_vacuum(rel, params, bstrategy); diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index 4e32380417..7492796ea2 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -32,13 +32,14 @@ typedef struct ClusterParams } ClusterParams; extern void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel); -extern void cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params); +extern void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params); extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode); extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, - char relpersistence, LOCKMODE lockmode); + char relpersistence, LOCKMODE lockmode_old, + LOCKMODE *lockmode_new_p); extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, -- 2.45.2 From cdf67d933a56323c0e5ca77495f60017d398bbd5 Mon Sep 17 00:00:00 2001 From: Antonin Houska <ah@cybertec.at> Date: Tue, 9 Jul 2024 17:45:59 +0200 Subject: [PATCH 2/4] Move progress related fields from PgBackendStatus to PgBackendProgress. VACUUM FULL / CLUSTER CONCURRENTLY will need to save and restore these fields at some point. --- src/backend/utils/activity/backend_progress.c | 18 +++++++++--------- src/backend/utils/activity/backend_status.c | 4 ++-- src/backend/utils/adt/pgstatfuncs.c | 6 +++--- src/include/utils/backend_progress.h | 14 ++++++++++++++ src/include/utils/backend_status.h | 14 ++------------ 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c index bfb9b7704b..e7c8bfba94 100644 --- a/src/backend/utils/activity/backend_progress.c +++ b/src/backend/utils/activity/backend_progress.c @@ -33,9 +33,9 @@ pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid) return; PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); - beentry->st_progress_command = cmdtype; - beentry->st_progress_command_target = relid; - MemSet(&beentry->st_progress_param, 0, sizeof(beentry->st_progress_param)); + beentry->st_progress.command = cmdtype; + beentry->st_progress.command_target = relid; + MemSet(&beentry->st_progress.param, 0, sizeof(beentry->st_progress.param)); PGSTAT_END_WRITE_ACTIVITY(beentry); } @@ -56,7 +56,7 @@ pgstat_progress_update_param(int index, int64 val) return; PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); - beentry->st_progress_param[index] = val; + beentry->st_progress.param[index] = val; PGSTAT_END_WRITE_ACTIVITY(beentry); } @@ -77,7 +77,7 @@ pgstat_progress_incr_param(int index, int64 incr) return; PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); - beentry->st_progress_param[index] += incr; + beentry->st_progress.param[index] += incr; PGSTAT_END_WRITE_ACTIVITY(beentry); } @@ -134,7 +134,7 @@ pgstat_progress_update_multi_param(int nparam, const int *index, { Assert(index[i] >= 0 && index[i] < PGSTAT_NUM_PROGRESS_PARAM); - beentry->st_progress_param[index[i]] = val[i]; + beentry->st_progress.param[index[i]] = val[i]; } PGSTAT_END_WRITE_ACTIVITY(beentry); @@ -155,11 +155,11 @@ pgstat_progress_end_command(void) if (!beentry || !pgstat_track_activities) return; - if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID) + if (beentry->st_progress.command == PROGRESS_COMMAND_INVALID) return; PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); - beentry->st_progress_command = PROGRESS_COMMAND_INVALID; - beentry->st_progress_command_target = InvalidOid; + beentry->st_progress.command = PROGRESS_COMMAND_INVALID; + beentry->st_progress.command_target = InvalidOid; PGSTAT_END_WRITE_ACTIVITY(beentry); } diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c index 1ccf4c6d83..b54a35d91c 100644 --- a/src/backend/utils/activity/backend_status.c +++ b/src/backend/utils/activity/backend_status.c @@ -378,8 +378,8 @@ pgstat_bestart(void) #endif lbeentry.st_state = STATE_UNDEFINED; - lbeentry.st_progress_command = PROGRESS_COMMAND_INVALID; - lbeentry.st_progress_command_target = InvalidOid; + lbeentry.st_progress.command = PROGRESS_COMMAND_INVALID; + lbeentry.st_progress.command_target = InvalidOid; lbeentry.st_query_id = UINT64CONST(0); /* diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 3876339ee1..fe09ae8f63 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -269,7 +269,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) * Report values for only those backends which are running the given * command. */ - if (beentry->st_progress_command != cmdtype) + if (beentry->st_progress.command != cmdtype) continue; /* Value available to all callers */ @@ -279,9 +279,9 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) /* show rest of the values including relid only to role members */ if (HAS_PGSTAT_PERMISSIONS(beentry->st_userid)) { - values[2] = ObjectIdGetDatum(beentry->st_progress_command_target); + values[2] = ObjectIdGetDatum(beentry->st_progress.command_target); for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++) - values[i + 3] = Int64GetDatum(beentry->st_progress_param[i]); + values[i + 3] = Int64GetDatum(beentry->st_progress.param[i]); } else { diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h index 7b63d38f97..e09598eafc 100644 --- a/src/include/utils/backend_progress.h +++ b/src/include/utils/backend_progress.h @@ -30,8 +30,22 @@ typedef enum ProgressCommandType PROGRESS_COMMAND_COPY, } ProgressCommandType; + #define PGSTAT_NUM_PROGRESS_PARAM 20 +/* + * Any command which wishes can advertise that it is running by setting + * command, command_target, and param[]. command_target should be the OID of + * the relation which the command targets (we assume there's just one, as this + * is meant for utility commands), but the meaning of each element in the + * param array is command-specific. + */ +typedef struct PgBackendProgress +{ + ProgressCommandType command; + Oid command_target; + int64 param[PGSTAT_NUM_PROGRESS_PARAM]; +} PgBackendProgress; extern void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid); diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h index 7b7f6f59d0..11cdf7f95a 100644 --- a/src/include/utils/backend_status.h +++ b/src/include/utils/backend_status.h @@ -155,18 +155,8 @@ typedef struct PgBackendStatus */ char *st_activity_raw; - /* - * Command progress reporting. Any command which wishes can advertise - * that it is running by setting st_progress_command, - * st_progress_command_target, and st_progress_param[]. - * st_progress_command_target should be the OID of the relation which the - * command targets (we assume there's just one, as this is meant for - * utility commands), but the meaning of each element in the - * st_progress_param array is command-specific. - */ - ProgressCommandType st_progress_command; - Oid st_progress_command_target; - int64 st_progress_param[PGSTAT_NUM_PROGRESS_PARAM]; + /* Command progress reporting. */ + PgBackendProgress st_progress; /* query identifier, optionally computed using post_parse_analyze_hook */ uint64 st_query_id; -- 2.45.2 From 1cb536663c018d98faf349a680b773364b464026 Mon Sep 17 00:00:00 2001 From: Antonin Houska <ah@cybertec.at> Date: Tue, 9 Jul 2024 17:45:59 +0200 Subject: [PATCH 3/4] Add CONCURRENTLY option to both VACUUM FULL and CLUSTER commands. Both VACUUM FULL and CLUSTER commands copy the relation data into a new file, create new indexes and eventually swap the files. To make sure that the old file does not change during the copying, the relation is locked in an exclusive mode, which prevents applications from both reading and writing. (To keep the data consistent, we'd only need to prevent the applications from writing, but even reading needs to be blocked before we can swap the files - otherwise some applications could continue using the old file. Since we cannot get stronger lock without releasing the weaker one first, we acquire the exclusive lock in the beginning and keep it till the end of the processing.) This patch introduces an alternative workflow, which only requires the exclusive lock when the relation (and index) files are being swapped. (Supposedly, the swapping should be pretty fast.) On the other hand, when we copy the data to the new file, we allow applications to read from the relation and even write into it. First, we scan the relation using a "historic snapshot", and insert all the tuples satisfying this snapshot into the new file. Note that, before creating that snapshot, we need to make sure that all the other backends treat the relation as a system catalog: in particular, they must log information on new command IDs (CIDs). We achieve that by adding the relation ID into a shared hash table and waiting until all the transactions currently writing into the table (i.e. transactions possibly not aware of the new entry) have finished. Second, logical decoding is used to capture the data changes done by applications during the copying (i.e. changes that do not satisfy the historic snapshot mentioned above), and those are applied to the new file before we acquire the exclusive lock we need to swap the files. (Of course, more data changes can take place while we are waiting for the lock - these will be applied to the new file after we have acquired the lock, before we swap the files.) While copying the data into the new file, we hold a lock that prevents applications from changing the relation tuple descriptor (tuples inserted into the old file must fit into the new file). However, as we have to release that lock before getting the exclusive one, it's possible that someone adds or drops a column, or changes the data type of an existing one. Therefore we have to check the tuple descriptor before we swap the files. If we find out that the tuple descriptor changed, ERROR is raised and all the changes are rolled back. Since a lot of effort can be wasted in such a case, the ALTER TABLE command also tries to check if VACUUM FULL / CLUSTER with the CONCURRENTLY option is running on the same relation, and raises an ERROR if it is. Like the existing implementation of both VACUUM FULL and CLUSTER commands, the variant with the CONCURRENTLY option also requires an extra space for the new relation and index files (which coexist with the old files for some time). In addition, the CONCURRENTLY option might introduce a lag in releasing WAL segments for archiving / recycling. This is due to the decoding of the data changes done by application concurrently. However, this lag should not be more than a single WAL segment. --- doc/src/sgml/monitoring.sgml | 36 +- doc/src/sgml/ref/cluster.sgml | 114 +- doc/src/sgml/ref/vacuum.sgml | 27 +- src/Makefile | 1 + src/backend/access/common/toast_internals.c | 3 +- src/backend/access/heap/heapam.c | 80 +- src/backend/access/heap/heapam_handler.c | 155 +- src/backend/access/heap/heapam_visibility.c | 30 +- src/backend/access/transam/xact.c | 52 + src/backend/catalog/index.c | 43 +- src/backend/catalog/system_views.sql | 17 +- src/backend/commands/cluster.c | 2618 ++++++++++++++++- src/backend/commands/matview.c | 2 +- src/backend/commands/tablecmds.c | 11 + src/backend/commands/vacuum.c | 137 +- src/backend/replication/logical/decode.c | 58 +- src/backend/replication/logical/snapbuild.c | 87 +- .../replication/pgoutput_cluster/Makefile | 32 + .../replication/pgoutput_cluster/meson.build | 18 + .../pgoutput_cluster/pgoutput_cluster.c | 321 ++ src/backend/storage/ipc/ipci.c | 3 + src/backend/tcop/utility.c | 11 + src/backend/utils/activity/backend_progress.c | 16 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/cache/inval.c | 22 + src/backend/utils/cache/relcache.c | 5 + src/backend/utils/time/snapmgr.c | 6 +- src/bin/psql/tab-complete.c | 5 +- src/include/access/heapam.h | 19 +- src/include/access/heapam_xlog.h | 2 + src/include/access/tableam.h | 10 + src/include/access/xact.h | 2 + src/include/catalog/index.h | 3 + src/include/commands/cluster.h | 117 +- src/include/commands/progress.h | 17 +- src/include/commands/vacuum.h | 17 +- src/include/replication/snapbuild.h | 2 + src/include/storage/lockdefs.h | 2 +- src/include/storage/lwlocklist.h | 1 + src/include/utils/backend_progress.h | 3 +- src/include/utils/inval.h | 2 + src/include/utils/rel.h | 7 +- src/include/utils/snapmgr.h | 3 + src/test/regress/expected/rules.out | 17 +- 44 files changed, 3876 insertions(+), 259 deletions(-) create mode 100644 src/backend/replication/pgoutput_cluster/Makefile create mode 100644 src/backend/replication/pgoutput_cluster/meson.build create mode 100644 src/backend/replication/pgoutput_cluster/pgoutput_cluster.c diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 991f629907..fe1ba36f40 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -5567,14 +5567,35 @@ FROM pg_stat_get_backend_idset() AS backendid; <row> <entry role="catalog_table_entry"><para role="column_definition"> - <structfield>heap_tuples_written</structfield> <type>bigint</type> + <structfield>heap_tuples_inserted</structfield> <type>bigint</type> </para> <para> - Number of heap tuples written. + Number of heap tuples inserted. This counter only advances when the phase is <literal>seq scanning heap</literal>, - <literal>index scanning heap</literal> - or <literal>writing new heap</literal>. + <literal>index scanning heap</literal>, + <literal>writing new heap</literal> + or <literal>catch-up</literal>. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>heap_tuples_updated</structfield> <type>bigint</type> + </para> + <para> + Number of heap tuples updated. + This counter only advances when the phase is <literal>catch-up</literal>. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>heap_tuples_deleted</structfield> <type>bigint</type> + </para> + <para> + Number of heap tuples deleted. + This counter only advances when the phase is <literal>catch-up</literal>. </para></entry> </row> @@ -5655,6 +5676,13 @@ FROM pg_stat_get_backend_idset() AS backendid; <command>CLUSTER</command> is currently writing the new heap. </entry> </row> + <row> + <entry><literal>catch-up</literal></entry> + <entry> + <command>CLUSTER</command> is currently processing the DML commands + that other transactions executed during any of the preceding phase. + </entry> + </row> <row> <entry><literal>swapping relation files</literal></entry> <entry> diff --git a/doc/src/sgml/ref/cluster.sgml b/doc/src/sgml/ref/cluster.sgml index c5760244e6..0fe4e9603b 100644 --- a/doc/src/sgml/ref/cluster.sgml +++ b/doc/src/sgml/ref/cluster.sgml @@ -26,6 +26,7 @@ CLUSTER [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <r <phrase>where <replaceable class="parameter">option</replaceable> can be one of:</phrase> VERBOSE [ <replaceable class="parameter">boolean</replaceable> ] + CONCURRENTLY [ <replaceable class="parameter">boolean</replaceable> ] </synopsis> </refsynopsisdiv> @@ -69,14 +70,18 @@ CLUSTER [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <r <replaceable class="parameter">table_name</replaceable> reclusters all the previously-clustered tables in the current database that the calling user has privileges for. This form of <command>CLUSTER</command> cannot be - executed inside a transaction block. + executed inside a transaction block. Also, if + the <literal>CONCURRENTLY</literal> option is used with this form, system + catalogs and <acronym>TOAST</acronym> tables are not processed. </para> <para> - When a table is being clustered, an <literal>ACCESS - EXCLUSIVE</literal> lock is acquired on it. This prevents any other - database operations (both reads and writes) from operating on the - table until the <command>CLUSTER</command> is finished. + When a table is being clustered, an <literal>ACCESS EXCLUSIVE</literal> + lock is acquired on it. This prevents any other database operations (both + reads and writes) from operating on the table until + the <command>CLUSTER</command> is finished. If you want to keep the table + accessible during the clustering, consider using + the <literal>CONCURRENTLY</literal> option. </para> </refsect1> @@ -111,6 +116,105 @@ CLUSTER [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <r </listitem> </varlistentry> + <varlistentry> + <term><literal>CONCURRENTLY</literal></term> + <listitem> + <para> + Allow other transactions to use the table while it is being clustered. + </para> + + <para> + Internally, <command>CLUSTER</command> copies the contents of the table + (ignoring dead tuples) into a new file, sorted by the specified index, + and also creates a new file for each index. Then it swaps the old and + new files for the table and all the indexes, and deletes the old + files. The <literal>ACCESS EXCLUSIVE</literal> lock is needed to make + sure that the old files do not change during the processing because the + chnages would get lost due to the swap. + </para> + + <para> + With the <literal>CONCURRENTLY</literal> option, the <literal>ACCESS + EXCLUSIVE</literal> lock is only acquired to swap the table and index + files. The data changes that took place during the creation of the new + table and index files are captured using logical decoding + (<xref linkend="logicaldecoding"/>) and applied before + the <literal>ACCESS EXCLUSIVE</literal> lock is requested. Thus the lock + is typically held only for the time needed to swap the files, which + should be pretty short. However, the time might still be noticeable + noticeable if too many data changes have been done to the table + while <command>CLUSTER</command> was waiting for the lock: those changes + must be processed before the files are swapped. + </para> + + <para> + Note that <command>CLUSTER</command> with the + the <literal>CONCURRENTLY</literal> option does not try to order the + rows inserted into the table after the clustering started. Also + note <command>CLUSTER</command> might fail to complete due to DDL + commands executed on the table by other transactions during the + clustering. + </para> + + <note> + <para> + In addition to the temporary space requirements explained below, + the <literal>CONCURRENTLY</literal> option can add to the usage of + temporary space a bit more. The reason is that other transactions can + perform DML operations which cannot be applied to the new file until + <command>CLUSTER</command> has copied all the tuples from the old + file. Thus the tuples inserted into the old file during the copying are + also stored in separately in a temporary file, so they can eventually + be applied to the new file. + </para> + + <para> + Furthermore, the data changes performed during the copying are + extracted from <link linkend="wal">write-ahead log</link> (WAL), and + this extraction (decoding) only takes place when certain amount of WAL + has been written. Therefore, WAL removal can be delayed by this + threshold. Currently the threshold is equal to the value of + the <link linkend="guc-wal-segment-size"><varname>wal_segment_size</varname></link> + configuration parameter. + </para> + </note> + + <para> + The <literal>CONCURRENTLY</literal> option cannot be used in the + following cases: + + <itemizedlist> + <listitem> + <para> + The table is a system catalog or a <acronym>TOAST</acronym> table. + </para> + </listitem> + + <listitem> + <para> + <command>CLUSTER</command> is executed inside a transaction block. + </para> + </listitem> + + <listitem> + <para> + The <link linkend="guc-wal-level"><varname>wal_level</varname></link> + configuration parameter is less than <literal>logical</literal>. + </para> + </listitem> + + <listitem> + <para> + The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link> + configuration parameter does not allow for creation of an additional + replication slot. + </para> + </listitem> + </itemizedlist> + </para> + </listitem> + </varlistentry> + <varlistentry> <term><replaceable class="parameter">boolean</replaceable></term> <listitem> diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml index 9857b35627..298cf7298d 100644 --- a/doc/src/sgml/ref/vacuum.sgml +++ b/doc/src/sgml/ref/vacuum.sgml @@ -39,6 +39,7 @@ VACUUM [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <re SKIP_DATABASE_STATS [ <replaceable class="parameter">boolean</replaceable> ] ONLY_DATABASE_STATS [ <replaceable class="parameter">boolean</replaceable> ] BUFFER_USAGE_LIMIT <replaceable class="parameter">size</replaceable> + CONCURRENTLY [ <replaceable class="parameter">boolean</replaceable> ] <phrase>and <replaceable class="parameter">table_and_columns</replaceable> is:</phrase> @@ -61,8 +62,12 @@ VACUUM [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <re <para> Without a <replaceable class="parameter">table_and_columns</replaceable> list, <command>VACUUM</command> processes every table and materialized view - in the current database that the current user has permission to vacuum. - With a list, <command>VACUUM</command> processes only those table(s). + in the current database that the current user has permission to vacuum. If + the <literal>CONCURRENTLY</literal> is specified (see below), tables which + have not been clustered yet are silently skipped. With a + list, <command>VACUUM</command> processes only those table(s). If + the <literal>CONCURRENTLY</literal> is specified, the list may only contain + tables which have already been clustered. </para> <para> @@ -360,6 +365,24 @@ VACUUM [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <re </listitem> </varlistentry> + <varlistentry> + <term><literal>CONCURRENTLY</literal></term> + <listitem> + <para> + Allow other transactions to use the table while it is being vacuumed. If + this option is specified, <command>VACUUM</command> can only process + tables which have already been clustered. For more information, see the + description of the <literal>CONCURRENTLY</literal> of the + <xref linkend="sql-cluster"/> command. + </para> + + <para> + The <literal>CONCURRENTLY</literal> option can only be used + if <literal>FULL</literal> is used at the same time. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><replaceable class="parameter">boolean</replaceable></term> <listitem> diff --git a/src/Makefile b/src/Makefile index 2f31a2f20a..8b9d30ff72 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ interfaces \ backend/replication/libpqwalreceiver \ backend/replication/pgoutput \ + backend/replication/pgoutput_cluster \ fe_utils \ bin \ pl \ diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 90d0654e62..183055647b 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -320,7 +320,8 @@ toast_save_datum(Relation rel, Datum value, memcpy(VARDATA(&chunk_data), data_p, chunk_size); toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull); - heap_insert(toastrel, toasttup, mycid, options, NULL); + heap_insert(toastrel, toasttup, GetCurrentTransactionId(), mycid, + options, NULL); /* * Create the index entry. We cheat a little here by not using diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 91b20147a0..493c351d7f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -75,7 +75,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, - bool all_visible_cleared, bool new_all_visible_cleared); + bool all_visible_cleared, bool new_all_visible_cleared, + bool wal_logical); static Bitmapset *HeapDetermineColumnsInfo(Relation relation, Bitmapset *interesting_cols, Bitmapset *external_cols, @@ -1975,7 +1976,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate) /* * heap_insert - insert tuple into a heap * - * The new tuple is stamped with current transaction ID and the specified + * The new tuple is stamped with specified transaction ID and the specified * command ID. * * See table_tuple_insert for comments about most of the input flags, except @@ -1991,15 +1992,16 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate) * reflected into *tup. */ void -heap_insert(Relation relation, HeapTuple tup, CommandId cid, - int options, BulkInsertState bistate) +heap_insert(Relation relation, HeapTuple tup, TransactionId xid, + CommandId cid, int options, BulkInsertState bistate) { - TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; + Assert(TransactionIdIsValid(xid)); + /* Cheap, simplistic check that the tuple matches the rel's rowtype. */ Assert(HeapTupleHeaderGetNatts(tup->t_data) <= RelationGetNumberOfAttributes(relation)); @@ -2079,8 +2081,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, /* * If this is a catalog, we need to transmit combo CIDs to properly * decode, so log that as well. + * + * Currently we only pass HEAP_INSERT_NO_LOGICAL when doing VACUUM + * FULL / CLUSTER, in which case the visibility information does not + * change. Therefore, there's no need to update the decoding snapshot. */ - if (RelationIsAccessibleInLogicalDecoding(relation)) + if ((options & HEAP_INSERT_NO_LOGICAL) == 0 && + RelationIsAccessibleInLogicalDecoding(relation)) log_heap_new_cid(relation, heaptup); /* @@ -2624,7 +2631,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, void simple_heap_insert(Relation relation, HeapTuple tup) { - heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL); + heap_insert(relation, tup, GetCurrentTransactionId(), + GetCurrentCommandId(true), 0, NULL); } /* @@ -2681,11 +2689,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask) */ TM_Result heap_delete(Relation relation, ItemPointer tid, - CommandId cid, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, + TM_FailureData *tmfd, bool changingPart, + bool wal_logical) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData tp; Page page; @@ -2702,6 +2710,7 @@ heap_delete(Relation relation, ItemPointer tid, bool old_key_copied = false; Assert(ItemPointerIsValid(tid)); + Assert(TransactionIdIsValid(xid)); /* * Forbid this during a parallel operation, lest it allocate a combo CID. @@ -2927,7 +2936,8 @@ l1: * Compute replica identity tuple before entering the critical section so * we don't PANIC upon a memory allocation failure. */ - old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied); + old_key_tuple = wal_logical ? + ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL; /* * If this is the first possibly-multixact-able operation in the current @@ -2995,8 +3005,12 @@ l1: /* * For logical decode we need combo CIDs to properly decode the * catalog + * + * Like in heap_insert(), visibility is unchanged when called from + * VACUUM FULL / CLUSTER. */ - if (RelationIsAccessibleInLogicalDecoding(relation)) + if (wal_logical && + RelationIsAccessibleInLogicalDecoding(relation)) log_heap_new_cid(relation, &tp); xlrec.flags = 0; @@ -3017,6 +3031,15 @@ l1: xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY; } + /* + * Unlike UPDATE, DELETE is decoded even if there is no old key, so it + * does not help to clear both XLH_DELETE_CONTAINS_OLD_TUPLE and + * XLH_DELETE_CONTAINS_OLD_KEY. Thus we need an extra flag. TODO + * Consider not decoding tuples w/o the old tuple/key instead. + */ + if (!wal_logical) + xlrec.flags |= XLH_DELETE_NO_LOGICAL; + XLogBeginInsert(); XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); @@ -3106,10 +3129,11 @@ simple_heap_delete(Relation relation, ItemPointer tid) TM_Result result; TM_FailureData tmfd; - result = heap_delete(relation, tid, + result = heap_delete(relation, tid, GetCurrentTransactionId(), GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , - &tmfd, false /* changingPart */ ); + &tmfd, false, /* changingPart */ + true /* wal_logical */); switch (result) { case TM_SelfModified: @@ -3148,12 +3172,11 @@ simple_heap_delete(Relation relation, ItemPointer tid) */ TM_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, - CommandId cid, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, LockTupleMode *lockmode, - TU_UpdateIndexes *update_indexes) + TransactionId xid, CommandId cid, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, + TU_UpdateIndexes *update_indexes, bool wal_logical) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); Bitmapset *hot_attrs; Bitmapset *sum_attrs; Bitmapset *key_attrs; @@ -3193,6 +3216,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, infomask2_new_tuple; Assert(ItemPointerIsValid(otid)); + Assert(TransactionIdIsValid(xid)); /* Cheap, simplistic check that the tuple matches the rel's rowtype. */ Assert(HeapTupleHeaderGetNatts(newtup->t_data) <= @@ -3981,8 +4005,12 @@ l2: /* * For logical decoding we need combo CIDs to properly decode the * catalog. + * + * Like in heap_insert(), visibility is unchanged when called from + * VACUUM FULL / CLUSTER. */ - if (RelationIsAccessibleInLogicalDecoding(relation)) + if (wal_logical && + RelationIsAccessibleInLogicalDecoding(relation)) { log_heap_new_cid(relation, &oldtup); log_heap_new_cid(relation, heaptup); @@ -3992,7 +4020,8 @@ l2: newbuf, &oldtup, heaptup, old_key_tuple, all_visible_cleared, - all_visible_cleared_new); + all_visible_cleared_new, + wal_logical); if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); @@ -4225,10 +4254,10 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup, TM_FailureData tmfd; LockTupleMode lockmode; - result = heap_update(relation, otid, tup, + result = heap_update(relation, otid, tup, GetCurrentTransactionId(), GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , - &tmfd, &lockmode, update_indexes); + &tmfd, &lockmode, update_indexes, true); switch (result) { case TM_SelfModified: @@ -8357,7 +8386,8 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, - bool all_visible_cleared, bool new_all_visible_cleared) + bool all_visible_cleared, bool new_all_visible_cleared, + bool wal_logical) { xl_heap_update xlrec; xl_heap_header xlhdr; @@ -8368,10 +8398,12 @@ log_heap_update(Relation reln, Buffer oldbuf, suffixlen = 0; XLogRecPtr recptr; Page page = BufferGetPage(newbuf); - bool need_tuple_data = RelationIsLogicallyLogged(reln); + bool need_tuple_data; bool init; int bufflags; + need_tuple_data = RelationIsLogicallyLogged(reln) && wal_logical; + /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 6f8b1b7929..02fd6d2983 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -33,6 +33,7 @@ #include "catalog/index.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "commands/cluster.h" #include "commands/progress.h" #include "executor/executor.h" #include "miscadmin.h" @@ -53,6 +54,9 @@ static void reform_and_rewrite_tuple(HeapTuple tuple, static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, HeapTuple tuple, OffsetNumber tupoffset); +static bool accept_tuple_for_concurrent_copy(HeapTuple tuple, + Snapshot snapshot, + Buffer buffer); static BlockNumber heapam_scan_get_blocks_done(HeapScanDesc hscan); @@ -250,7 +254,8 @@ heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, tuple->t_tableOid = slot->tts_tableOid; /* Perform the insertion, and copy the resulting ItemPointer */ - heap_insert(relation, tuple, cid, options, bistate); + heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options, + bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) @@ -273,7 +278,8 @@ heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, options |= HEAP_INSERT_SPECULATIVE; /* Perform the insertion, and copy the resulting ItemPointer */ - heap_insert(relation, tuple, cid, options, bistate); + heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options, + bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) @@ -307,7 +313,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); + return heap_delete(relation, tid, GetCurrentTransactionId(), cid, + crosscheck, wait, tmfd, changingPart, true); } @@ -325,8 +332,9 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, slot->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slot->tts_tableOid; - result = heap_update(relation, otid, tuple, cid, crosscheck, wait, - tmfd, lockmode, update_indexes); + result = heap_update(relation, otid, tuple, GetCurrentTransactionId(), + cid, crosscheck, wait, + tmfd, lockmode, update_indexes, true); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); /* @@ -686,6 +694,8 @@ static void heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -706,6 +716,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, bool *isnull; BufferHeapTupleTableSlot *hslot; BlockNumber prev_cblock = InvalidBlockNumber; + bool concurrent = snapshot != NULL; + XLogRecPtr end_of_wal_prev = GetFlushRecPtr(NULL); /* Remember if it's a system catalog */ is_system_catalog = IsSystemRelation(OldHeap); @@ -786,6 +798,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, HeapTuple tuple; Buffer buf; bool isdead; + HTSV_Result vis; CHECK_FOR_INTERRUPTS(); @@ -840,7 +853,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, LockBuffer(buf, BUFFER_LOCK_SHARE); - switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf)) + switch ((vis = HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))) { case HEAPTUPLE_DEAD: /* Definitely dead */ @@ -856,14 +869,15 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, case HEAPTUPLE_INSERT_IN_PROGRESS: /* - * Since we hold exclusive lock on the relation, normally the - * only way to see this is if it was inserted earlier in our - * own transaction. However, it can happen in system + * As long as we hold exclusive lock on the relation, normally + * the only way to see this is if it was inserted earlier in + * our own transaction. However, it can happen in system * catalogs, since we tend to release write lock before commit - * there. Give a warning if neither case applies; but in any - * case we had better copy it. + * there. Also, there's no exclusive lock during concurrent + * processing. Give a warning if neither case applies; but in + * any case we had better copy it. */ - if (!is_system_catalog && + if (!is_system_catalog && !concurrent && !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data))) elog(WARNING, "concurrent insert in progress within table \"%s\"", RelationGetRelationName(OldHeap)); @@ -875,7 +889,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* * Similar situation to INSERT_IN_PROGRESS case. */ - if (!is_system_catalog && + if (!is_system_catalog && !concurrent && !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data))) elog(WARNING, "concurrent delete in progress within table \"%s\"", RelationGetRelationName(OldHeap)); @@ -889,8 +903,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, break; } - LockBuffer(buf, BUFFER_LOCK_UNLOCK); - if (isdead) { *tups_vacuumed += 1; @@ -901,9 +913,39 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, *tups_vacuumed += 1; *tups_recently_dead -= 1; } + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); continue; } + /* + * Ignore concurrent changes now, they'll be processed later via + * logical decoding. INSERT_IN_PROGRESS is rejected right away because + * our snapshot should represent a point in time which should precede + * (or be equal to) the state of transactions as it was when the + * "SatisfiesVacuum" test was performed. Thus + * accept_tuple_for_concurrent_copy() should not consider the tuple + * inserted. + */ + if (concurrent && + (vis == HEAPTUPLE_INSERT_IN_PROGRESS || + !accept_tuple_for_concurrent_copy(tuple, snapshot, buf))) + { + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + continue; + } + + /* + * In the concurrent case, we should not unlock the buffer until the + * tuple has been copied to the new file: if a concurrent transaction + * marked it updated or deleted in between, we'd fail to replay that + * transaction's changes because then we'd try to perform the same + * UPDATE / DELETE twice. XXX Should we instead create a copy of the + * tuple so that the buffer can be unlocked right away? + */ + if (!concurrent) + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + *num_tuples += 1; if (tuplesort != NULL) { @@ -920,7 +962,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, { const int ct_index[] = { PROGRESS_CLUSTER_HEAP_TUPLES_SCANNED, - PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN + PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED }; int64 ct_val[2]; @@ -935,6 +977,35 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, ct_val[1] = *num_tuples; pgstat_progress_update_multi_param(2, ct_index, ct_val); } + + /* See the comment on unlocking above. */ + if (concurrent) + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + /* + * Process the WAL produced by the load, as well as by other + * transactions, so that the replication slot can advance and WAL does + * not pile up. Use wal_segment_size as a threshold so that we do not + * introduce the decoding overhead too often. + * + * Of course, we must not apply the changes until the initial load has + * completed. + * + * Note that our insertions into the new table should not be decoded + * as we (intentionally) do not write the logical decoding specific + * information to WAL. + */ + if (concurrent) + { + XLogRecPtr end_of_wal; + + end_of_wal = GetFlushRecPtr(NULL); + if ((end_of_wal - end_of_wal_prev) > wal_segment_size) + { + cluster_decode_concurrent_changes(decoding_ctx, end_of_wal); + end_of_wal_prev = end_of_wal; + } + } } if (indexScan != NULL) @@ -978,7 +1049,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, values, isnull, rwstate); /* Report n_tuples */ - pgstat_progress_update_param(PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN, + pgstat_progress_update_param(PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED, n_tuples); } @@ -2583,6 +2654,56 @@ SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, } } +/* + * Check if the tuple was inserted, updated or deleted while + * heapam_relation_copy_for_cluster() was copying the data. + * + * 'snapshot' is used to determine whether xmin/xmax was set by a transaction + * that is still in-progress, or one that started in the future from the + * snapshot perspective. + * + * Returns true if the insertion is visible to 'snapshot', but clear xmax if + * it was set by a transaction which is in-progress or in the future from the + * snapshot perspective. (The xmax will be set later, when we decode the + * corresponding UPDATE / DELETE from WAL.) + * + * Returns false if the insertion is not visible to 'snapshot'. + */ +static bool +accept_tuple_for_concurrent_copy(HeapTuple tuple, Snapshot snapshot, + Buffer buffer) +{ + Assert(snapshot->snapshot_type == SNAPSHOT_MVCC); + + /* + * First, check if the tuple should be rejected because it was inserted + * concurrently. + */ + if (!HeapTupleMVCCInserted(tuple, snapshot, buffer)) + return false; + + /* + * If the tuple was deleted / updated but our snapshot still sees it, we + * need to keep it. In that case, clear the information that indicates the + * deletion / update. Otherwise the tuple chain would stay incomplete (as + * we will reject the new tuple above), and the delete / update would fail + * if executed later during logical decoding. + */ + if (TransactionIdIsNormal(HeapTupleHeaderGetRawXmax(tuple->t_data)) && + HeapTupleMVCCNotDeleted(tuple, snapshot, buffer)) + { + /* TODO More work needed here?*/ + tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(tuple->t_data, 0); + } + + /* + * Accept the tuple even if our snapshot considers it deleted - older + * snapshots can still see the tuple. + */ + return true; +} + /* ------------------------------------------------------------------------ * Definition of the heap table access method. diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index 9243feed01..d702592469 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -955,16 +955,31 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot, * did TransactionIdIsInProgress in each call --- to no avail, as long as the * inserting/deleting transaction was still running --- which was more cycles * and more contention on ProcArrayLock. + * + * The checks are split into two functions, HeapTupleMVCCInserted() and + * HeapTupleMVCCNotDeleted(), because they are also useful separately. */ static bool HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer) { - HeapTupleHeader tuple = htup->t_data; - Assert(ItemPointerIsValid(&htup->t_self)); Assert(htup->t_tableOid != InvalidOid); + return HeapTupleMVCCInserted(htup, snapshot, buffer) && + HeapTupleMVCCNotDeleted(htup, snapshot, buffer); +} + +/* + * HeapTupleMVCCInserted + * True iff heap tuple was successfully inserted for the given MVCC + * snapshot. + */ +bool +HeapTupleMVCCInserted(HeapTuple htup, Snapshot snapshot, Buffer buffer) +{ + HeapTupleHeader tuple = htup->t_data; + if (!HeapTupleHeaderXminCommitted(tuple)) { if (HeapTupleHeaderXminInvalid(tuple)) @@ -1073,6 +1088,17 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, } /* by here, the inserting transaction has committed */ + return true; +} + +/* + * HeapTupleMVCCNotDeleted + * True iff heap tuple was not deleted for the given MVCC snapshot. + */ +bool +HeapTupleMVCCNotDeleted(HeapTuple htup, Snapshot snapshot, Buffer buffer) +{ + HeapTupleHeader tuple = htup->t_data; if (tuple->t_infomask & HEAP_XMAX_INVALID) /* xid invalid or aborted */ return true; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d119ab909d..f9b8cb4da7 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -124,6 +124,18 @@ static FullTransactionId XactTopFullTransactionId = {InvalidTransactionId}; static int nParallelCurrentXids = 0; static TransactionId *ParallelCurrentXids; +/* + * Another case that requires TransactionIdIsCurrentTransactionId() to behave + * specially is when CLUSTER CONCURRENTLY is processing data changes made in + * the old storage of a table by other transactions. When applying the changes + * to the new storage, the backend executing the CLUSTER command needs to act + * on behalf on those other transactions. The transactions responsible for the + * changes in the old storage are stored in this array, sorted by + * xidComparator. + */ +static int nClusterCurrentXids = 0; +static TransactionId *ClusterCurrentXids = NULL; + /* * Miscellaneous flag bits to record events which occur on the top level * transaction. These flags are only persisted in MyXactFlags and are intended @@ -970,6 +982,8 @@ TransactionIdIsCurrentTransactionId(TransactionId xid) int low, high; + Assert(nClusterCurrentXids == 0); + low = 0; high = nParallelCurrentXids - 1; while (low <= high) @@ -989,6 +1003,21 @@ TransactionIdIsCurrentTransactionId(TransactionId xid) return false; } + /* + * When executing CLUSTER CONCURRENTLY, the array of current transactions + * is given. + */ + if (nClusterCurrentXids > 0) + { + Assert(nParallelCurrentXids == 0); + + return bsearch(&xid, + ClusterCurrentXids, + nClusterCurrentXids, + sizeof(TransactionId), + xidComparator) != NULL; + } + /* * We will return true for the Xid of the current subtransaction, any of * its subcommitted children, any of its parents, or any of their @@ -5621,6 +5650,29 @@ EndParallelWorkerTransaction(void) CurrentTransactionState->blockState = TBLOCK_DEFAULT; } +/* + * SetClusterCurrentXids + * Set the XID array that TransactionIdIsCurrentTransactionId() should + * use. + */ +void +SetClusterCurrentXids(TransactionId *xip, int xcnt) +{ + ClusterCurrentXids = xip; + nClusterCurrentXids = xcnt; +} + +/* + * ResetClusterCurrentXids + * Undo the effect of SetClusterCurrentXids(). + */ +void +ResetClusterCurrentXids(void) +{ + ClusterCurrentXids = NULL; + nClusterCurrentXids = 0; +} + /* * ShowTransactionState * Debug support diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index a819b4197c..a25c84d7ae 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1415,22 +1415,7 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId, for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++) opclassOptions[i] = get_attoptions(oldIndexId, i + 1); - /* Extract statistic targets for each attribute */ - stattargets = palloc0_array(NullableDatum, newInfo->ii_NumIndexAttrs); - for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++) - { - HeapTuple tp; - Datum dat; - - tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(oldIndexId), Int16GetDatum(i + 1)); - if (!HeapTupleIsValid(tp)) - elog(ERROR, "cache lookup failed for attribute %d of relation %u", - i + 1, oldIndexId); - dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull); - ReleaseSysCache(tp); - stattargets[i].value = dat; - stattargets[i].isnull = isnull; - } + stattargets = get_index_stattargets(oldIndexId, newInfo); /* * Now create the new index. @@ -1469,6 +1454,32 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId, return newIndexId; } +NullableDatum * +get_index_stattargets(Oid indexid, IndexInfo *indInfo) +{ + NullableDatum *stattargets; + + /* Extract statistic targets for each attribute */ + stattargets = palloc0_array(NullableDatum, indInfo->ii_NumIndexAttrs); + for (int i = 0; i < indInfo->ii_NumIndexAttrs; i++) + { + HeapTuple tp; + Datum dat; + bool isnull; + + tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(indexid), Int16GetDatum(i + 1)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + i + 1, indexid); + dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull); + ReleaseSysCache(tp); + stattargets[i].value = dat; + stattargets[i].isnull = isnull; + } + + return stattargets; +} + /* * index_concurrently_build * diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 19cabc9a47..fddab1cfa9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1236,16 +1236,19 @@ CREATE VIEW pg_stat_progress_cluster AS WHEN 2 THEN 'index scanning heap' WHEN 3 THEN 'sorting tuples' WHEN 4 THEN 'writing new heap' - WHEN 5 THEN 'swapping relation files' - WHEN 6 THEN 'rebuilding index' - WHEN 7 THEN 'performing final cleanup' + WHEN 5 THEN 'catch-up' + WHEN 6 THEN 'swapping relation files' + WHEN 7 THEN 'rebuilding index' + WHEN 8 THEN 'performing final cleanup' END AS phase, CAST(S.param3 AS oid) AS cluster_index_relid, S.param4 AS heap_tuples_scanned, - S.param5 AS heap_tuples_written, - S.param6 AS heap_blks_total, - S.param7 AS heap_blks_scanned, - S.param8 AS index_rebuild_count + S.param5 AS heap_tuples_inserted, + S.param6 AS heap_tuples_updated, + S.param7 AS heap_tuples_deleted, + S.param8 AS heap_blks_total, + S.param9 AS heap_blks_scanned, + S.param10 AS index_rebuild_count FROM pg_stat_get_progress_info('CLUSTER') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 194d143cf4..6397f7f8c4 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -25,6 +25,10 @@ #include "access/toast_internals.h" #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" +#include "access/xlogutils.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/heap.h" @@ -32,6 +36,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/pg_am.h" +#include "catalog/pg_control.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" #include "catalog/toasting.h" @@ -40,10 +45,15 @@ #include "commands/progress.h" #include "commands/tablecmds.h" #include "commands/vacuum.h" +#include "executor/executor.h" #include "miscadmin.h" #include "optimizer/optimizer.h" #include "pgstat.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/snapbuild.h" #include "storage/bufmgr.h" +#include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "utils/acl.h" @@ -57,6 +67,8 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" +typedef struct RewriteStateData *RewriteState; + /* * This struct is used to pass around the information on tables to be * clustered. We need this so we can make a list of them when invoked without @@ -68,17 +80,175 @@ typedef struct Oid indexOid; } RelToCluster; +/* + * The following definitions are used for concurrent processing. + */ + +/* + * OID of the table being processed by CLUSTER CONCURRENTLY by this backend. + */ +static Oid clustered_rel = InvalidOid; +/* The same for its TOAST relation. */ +static Oid clustered_rel_toast = InvalidOid; + +/* XXX Do we also need to mention VACUUM FULL CONCURRENTLY? */ +#define CLUSTER_IN_PROGRESS_MESSAGE \ + "relation \"%s\" is already being processed by CLUSTER CONCURRENTLY" + +/* + * Everything we need to call ExecInsertIndexTuples(). + */ +typedef struct IndexInsertState +{ + ResultRelInfo *rri; + EState *estate; + ExprContext *econtext; + + Relation ident_index; +} IndexInsertState; -static void cluster_multiple_rels(List *rtcs, ClusterParams *params); -static void rebuild_relation(Relation OldHeap, Relation index, bool verbose); +/* + * Catalog information to check if another backend changed the relation in + * such a way that makes CLUSTER CONCURRENTLY unable to continue. Such changes + * are possible because cluster_rel() has to release its lock on the relation + * in order to acquire AccessExclusiveLock that it needs to swap the relation + * files. + * + * The most obvious problem is that the tuple descriptor has changed, since + * then the tuples we try to insert into the new storage are not guaranteed to + * fit into the storage. + * + * Another problem is that multiple backends might call cluster_rel(). This is + * not necessarily a correctness issue, but it definitely means wasted CPU + * time. + * + * Where possible, commands which might change the relation in an incompatible + * way should check if CLUSTER CONCURRENTLY is running, before they start to + * do the actual changes (see is_concurrent_cluster_in_progress()). Anything + * else must be caught by check_catalog_changes(), which uses this structure. + */ +typedef struct CatalogState +{ + /* Tuple descriptor of the relation. */ + TupleDesc tupdesc; + + /* The number of indexes tracked. */ + int ninds; + /* The index OIDs. */ + Oid *ind_oids; + /* The index tuple descriptors. */ + TupleDesc *ind_tupdescs; + + /* The following are copies of the corresponding fields of pg_class. */ + Oid reltoastrelid; + char relpersistence; + char replident; + + /* rd_replidindex */ + Oid replidindex; +} CatalogState; + +/* The WAL segment being decoded. */ +static XLogSegNo cluster_current_segment = 0; + +static void cluster_multiple_rels(List *rtcs, ClusterParams *params, + LOCKMODE lock_mode, bool isTopLevel); +static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, + bool concurrent); static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, + Snapshot snapshot, LogicalDecodingContext *decoding_ctx, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, MultiXactId *pCutoffMulti); static List *get_tables_to_cluster(MemoryContext cluster_context); static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid); static bool cluster_is_permitted_for_relation(Oid relid, Oid userid); +static void check_concurrent_cluster_requirements(Relation rel, + bool isTopLevel, + bool isCluster); +static void begin_concurrent_cluster(Relation *rel_p, Relation *index_p, + bool *entered_p); +static void end_concurrent_cluster(Oid relid, bool error); +static void cluster_before_shmem_exit_callback(int code, Datum arg); +static CatalogState *get_catalog_state(Relation rel); +static void free_catalog_state(CatalogState *state); +static void check_catalog_changes(Relation rel, CatalogState *cat_state); +static LogicalDecodingContext *setup_logical_decoding(Oid relid, + const char *slotname, + TupleDesc tupdesc); +static HeapTuple get_changed_tuple(ConcurrentChange *change); +static void apply_concurrent_changes(ClusterDecodingState *dstate, + Relation rel, ScanKey key, int nkeys, + IndexInsertState *iistate); +static void apply_concurrent_insert(Relation rel, ConcurrentChange *change, + HeapTuple tup, IndexInsertState *iistate, + TupleTableSlot *index_slot); +static void apply_concurrent_update(Relation rel, HeapTuple tup, + HeapTuple tup_target, + ConcurrentChange *change, + IndexInsertState *iistate, + TupleTableSlot *index_slot); +static void apply_concurrent_delete(Relation rel, HeapTuple tup_target, + ConcurrentChange *change); +static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys, + HeapTuple tup_key, + Snapshot snapshot, + IndexInsertState *iistate, + TupleTableSlot *ident_slot, + IndexScanDesc *scan_p); +static void process_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal, + Relation rel_dst, + Relation rel_src, + ScanKey ident_key, + int ident_key_nentries, + IndexInsertState *iistate); +static IndexInsertState *get_index_insert_state(Relation relation, + Oid ident_index_id); +static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src, + int *nentries); +static void free_index_insert_state(IndexInsertState *iistate); +static void cleanup_logical_decoding(LogicalDecodingContext *ctx); +static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, + Relation cl_index, + CatalogState *cat_state, + LogicalDecodingContext *ctx, + bool swap_toast_by_content, + TransactionId frozenXid, + MultiXactId cutoffMulti); +static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes); + +/* + * Use this API when relation needs to be unlocked, closed and re-opened. If + * the relation got dropped while being unlocked, raise ERROR that mentions + * the relation name rather than OID. + */ +typedef struct RelReopenInfo +{ + /* + * The relation to be closed. Pointer to the value is stored here so that + * the user gets his reference updated automatically on re-opening. + * + * When calling unlock_and_close_relations(), 'relid' can be passed + * instead of 'rel_p' when the caller only needs to gather information for + * subsequent opening. + */ + Relation *rel_p; + Oid relid; + + char relkind; + LOCKMODE lockmode_orig; /* The existing lock mode */ + LOCKMODE lockmode_new; /* The lock mode after the relation is + * re-opened */ + char *relname; /* Relation name, initialized automatically. */ +} RelReopenInfo; + +static void init_rel_reopen_info(RelReopenInfo *rri, Relation *rel_p, + Oid relid, LOCKMODE lockmode_orig, + LOCKMODE lockmode_new); +static void unlock_and_close_relations(RelReopenInfo *rels, int nrel); +static void reopen_relations(RelReopenInfo *rels, int nrel); /*--------------------------------------------------------------------------- * This cluster code allows for clustering multiple tables at once. Because @@ -110,10 +280,12 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) ListCell *lc; ClusterParams params = {0}; bool verbose = false; + bool concurrent = false; Relation rel = NULL; Oid indexOid = InvalidOid; MemoryContext cluster_context; List *rtcs; + LOCKMODE lock_mode; /* Parse option list */ foreach(lc, stmt->params) @@ -122,6 +294,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) if (strcmp(opt->defname, "verbose") == 0) verbose = defGetBoolean(opt); + else if (strcmp(opt->defname, "concurrently") == 0) + concurrent = defGetBoolean(opt); else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -130,20 +304,30 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) parser_errposition(pstate, opt->location))); } - params.options = (verbose ? CLUOPT_VERBOSE : 0); + params.options = + (verbose ? CLUOPT_VERBOSE : 0) | + (concurrent ? CLUOPT_CONCURRENT : 0); + + /* + * Determine the lock mode expected by cluster_rel(). + * + * In the exclusive case, we obtain AccessExclusiveLock right away to + * avoid lock-upgrade hazard in the single-transaction case. In the + * CONCURRENT case, the AccessExclusiveLock will only be used at the end + * of processing, supposedly for very short time. Until then, we'll have + * to unlock the relation temporarily, so there's no lock-upgrade hazard. + */ + lock_mode = (params.options & CLUOPT_CONCURRENT) == 0 ? + AccessExclusiveLock : LOCK_CLUSTER_CONCURRENT; if (stmt->relation != NULL) { /* This is the single-relation case. */ Oid tableOid; - /* - * Find, lock, and check permissions on the table. We obtain - * AccessExclusiveLock right away to avoid lock-upgrade hazard in the - * single-transaction case. - */ + /* Find, lock, and check permissions on the table. */ tableOid = RangeVarGetRelidExtended(stmt->relation, - AccessExclusiveLock, + lock_mode, 0, RangeVarCallbackMaintainsTable, NULL); @@ -198,7 +382,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) * Do the job. (The function will close the relation, lock is kept * till commit.) */ - cluster_rel(rel, indexOid, ¶ms); + cluster_rel(rel, indexOid, ¶ms, isTopLevel); return; } @@ -237,7 +421,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) rtcs = get_tables_to_cluster_partitioned(cluster_context, indexOid); /* close relation, releasing lock on parent table */ - table_close(rel, AccessExclusiveLock); + table_close(rel, lock_mode); } else { @@ -246,7 +430,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) } /* Do the job. */ - cluster_multiple_rels(rtcs, ¶ms); + cluster_multiple_rels(rtcs, ¶ms, lock_mode, isTopLevel); /* Start a new transaction for the cleanup work. */ StartTransactionCommand(); @@ -263,7 +447,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) * return. */ static void -cluster_multiple_rels(List *rtcs, ClusterParams *params) +cluster_multiple_rels(List *rtcs, ClusterParams *params, LOCKMODE lock_mode, + bool isTopLevel) { ListCell *lc; @@ -283,13 +468,19 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params) /* functions in indexes may want a snapshot set */ PushActiveSnapshot(GetTransactionSnapshot()); - rel = table_open(rtc->tableOid, AccessExclusiveLock); + rel = table_open(rtc->tableOid, lock_mode); - /* - * Do the job. (The function will close the relation, lock is kept - * till commit.) - */ - cluster_rel(rel, rtc->indexOid, params); + /* Not all relations cannot be processed in the concurrent mode. */ + if ((params->options & CLUOPT_CONCURRENT) == 0 || + check_relation_is_clusterable_concurrently(rel, DEBUG1, + "CLUSTER (CONCURRENTLY)")) + { + /* Do the job. (The function will close the relation, lock is kept + * till commit.) */ + cluster_rel(rel, rtc->indexOid, params, isTopLevel); + } + else + table_close(rel, lock_mode); PopActiveSnapshot(); CommitTransactionCommand(); @@ -313,10 +504,21 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params) * instead of index order. This is the new implementation of VACUUM FULL, * and error messages should refer to the operation as VACUUM not CLUSTER. * - * We expect that OldHeap is already locked in AccessExclusiveLock mode. + * We expect that OldHeap is already locked. The lock mode is + * AccessExclusiveLock for normal processing and LOCK_CLUSTER_CONCURRENT for + * concurrent processing (so that SELECT, INSERT, UPDATE and DELETE commands + * work, but cluster_rel() cannot be called concurrently for the same + * relation). + * + * Note that, in the concurrent case, the function releases the lock at some + * point, in order to get AccessExclusiveLock for the final steps (i.e. to + * swap the relation files). To make things simpler, the caller should expect + * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The + * AccessExclusiveLock is kept till the end of the transaction.) */ void -cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) +cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, + bool isTopLevel) { Oid tableOid = RelationGetRelid(OldHeap); Oid save_userid; @@ -325,6 +527,41 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) bool verbose = ((params->options & CLUOPT_VERBOSE) != 0); bool recheck = ((params->options & CLUOPT_RECHECK) != 0); Relation index = NULL; + bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0); + LOCKMODE lmode; + bool entered, success; + + /* Check that the correct lock is held. */ + lmode = !concurrent ? AccessExclusiveLock : LOCK_CLUSTER_CONCURRENT; + + /* + * Skip the relation if it's being processed concurrently. In such a case, + * we cannot rely on a lock because the other backend needs to release it + * temporarily at some point. + * + * This check should not take place until we have a lock that prevents + * another backend from starting VACUUM FULL / CLUSTER CONCURRENTLY after + * our check. + */ + Assert(CheckRelationLockedByMe(OldHeap, lmode, false)); + if (is_concurrent_cluster_in_progress(tableOid)) + { + ereport(NOTICE, + (errmsg(CLUSTER_IN_PROGRESS_MESSAGE, + RelationGetRelationName(OldHeap)))); + table_close(OldHeap, lmode); + return; + } + + /* There are specific requirements on concurrent processing. */ + if (concurrent) + { + check_concurrent_cluster_requirements(OldHeap, isTopLevel, + OidIsValid(indexOid)); + + check_relation_is_clusterable_concurrently(OldHeap, ERROR, + "CLUSTER (CONCURRENTLY)"); + } /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); @@ -361,7 +598,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) /* Check that the user still has privileges for the relation */ if (!cluster_is_permitted_for_relation(tableOid, save_userid)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); goto out; } @@ -376,7 +613,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) */ if (RELATION_IS_OTHER_TEMP(OldHeap)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); goto out; } @@ -387,7 +624,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) */ if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid))) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); goto out; } @@ -398,7 +635,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) if ((params->options & CLUOPT_RECHECK_ISCLUSTERED) != 0 && !get_index_isclustered(indexOid)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); goto out; } } @@ -414,6 +651,11 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot cluster a shared catalog"))); + /* + * The CONCURRENT case should have been rejected earlier because it does + * not support system catalogs. + */ + Assert(!(OldHeap->rd_rel->relisshared && concurrent)); /* * Don't process temp tables of other backends ... their local buffer @@ -440,7 +682,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) /* Check heap and index are valid to cluster on */ if (OidIsValid(indexOid)) { - check_index_is_clusterable(OldHeap, indexOid, AccessExclusiveLock); + check_index_is_clusterable(OldHeap, indexOid, lmode); /* Open the index (It should already be locked.) */ index = index_open(indexOid, NoLock); } @@ -455,7 +697,8 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW && !RelationIsPopulated(OldHeap)) { - relation_close(OldHeap, AccessExclusiveLock); + index_close(index, lmode); + relation_close(OldHeap, lmode); goto out; } @@ -468,11 +711,42 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params) * invalid, because we move tuples around. Promote them to relation * locks. Predicate locks on indexes will be promoted when they are * reindexed. + * + * During concurrent processing, the heap as well as its indexes stay in + * operation, so we postpone this step until they are locked using + * AccessExclusiveLock near the end of the processing. */ - TransferPredicateLocksToHeapRelation(OldHeap); + if (!concurrent) + TransferPredicateLocksToHeapRelation(OldHeap); /* rebuild_relation does all the dirty work */ - rebuild_relation(OldHeap, index, verbose); + entered = false; + success = false; + PG_TRY(); + { + /* + * For concurrent processing, make sure other transactions treat this + * table as if it was a system / user catalog, and WAL the relevant + * additional information. ERROR is raised if another backend is + * processing the same table. + */ + if (concurrent) + { + Relation *index_p = index ? &index : NULL; + + begin_concurrent_cluster(&OldHeap, index_p, &entered); + } + + rebuild_relation(OldHeap, index, verbose, + (params->options & CLUOPT_CONCURRENT) != 0); + success = true; + } + PG_FINALLY(); + { + if (concurrent && entered) + end_concurrent_cluster(tableOid, !success); + } + PG_END_TRY(); /* * NB: rebuild_relation does table_close() on OldHeap, and also on index, @@ -622,18 +896,99 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal) table_close(pg_index, RowExclusiveLock); } +/* + * Check if the CONCURRENTLY option is legal for the relation. + */ +bool +check_relation_is_clusterable_concurrently(Relation rel, int elevel, + const char *stmt) +{ + char relpersistence, replident; + Oid ident_idx; + + /* Data changes in system relations are not logically decoded. */ + if (IsCatalogRelation(rel)) + { + ereport(elevel, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot process relation \"%s\"", + RelationGetRelationName(rel)), + errhint("%s is not supported for catalog relations", stmt))); + return false; + } + + if (IsToastRelation(rel)) + { + ereport(elevel, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot process relation \"%s\"", + RelationGetRelationName(rel)), + errhint("%s is not supported for TOAST relations, unless the main relation is processed too", + stmt))); + return false; + } + + relpersistence = rel->rd_rel->relpersistence; + if (relpersistence != RELPERSISTENCE_PERMANENT) + { + ereport(elevel, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot process relation \"%s\"", + RelationGetRelationName(rel)), + errhint("CLUSTER CONCURRENTLY is only allowed for permanent relations"))); + return false; + } + + /* With NOTHING, WAL does not contain the old tuple. */ + replident = rel->rd_rel->relreplident; + if (replident == REPLICA_IDENTITY_NOTHING) + { + ereport(elevel, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot process relation \"%s\"", + RelationGetRelationName(rel)), + errhint("relation \"%s\" has insufficient replication identity", + RelationGetRelationName(rel)))); + return false; + } + + /* + * Identity index is not set if the replica identity is FULL, but PK might + * exist in such a case. + */ + ident_idx = RelationGetReplicaIndex(rel); + if (!OidIsValid(ident_idx) && OidIsValid(rel->rd_pkindex)) + ident_idx = rel->rd_pkindex; + if (!OidIsValid(ident_idx)) + { + ereport(elevel, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot process relation \"%s\"", + RelationGetRelationName(rel)), + (errhint("relation \"%s\" has no identity index", + RelationGetRelationName(rel))))); + return false; + } + + return true; +} + /* * rebuild_relation: rebuild an existing relation in index or physical order * - * OldHeap: table to rebuild --- must be opened and exclusive-locked! + * OldHeap: table to rebuild --- must be opened and locked. See cluster_rel() + * for comments on the required lock strength. + * * index: index to cluster by, or NULL to rewrite in physical order. Must be * opened and locked. * * On exit, the heap (and also the index, if one was passed) are closed, but - * still locked with AccessExclusiveLock. + * still locked with AccessExclusiveLock. (The function handles the lock + * upgrade if 'concurrent' is true.) */ static void -rebuild_relation(Relation OldHeap, Relation index, bool verbose) +rebuild_relation(Relation OldHeap, Relation index, bool verbose, + bool concurrent) { Oid tableOid = RelationGetRelid(OldHeap); Oid indexOid = index ? RelationGetRelid(index) : InvalidOid; @@ -642,19 +997,83 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose) Oid OIDNewHeap; Relation NewHeap; char relpersistence; - bool is_system_catalog; bool swap_toast_by_content; TransactionId frozenXid; MultiXactId cutoffMulti; + NameData slotname; + LogicalDecodingContext *ctx = NULL; + Snapshot snapshot = NULL; + CatalogState *cat_state = NULL; LOCKMODE lmode_new; + if (concurrent) + { + TupleDesc tupdesc; + RelReopenInfo rri[2]; + int nrel; + + /* + * CLUSTER CONCURRENTLY is not allowed in a transaction block, so this + * should never fire. + */ + Assert(GetTopTransactionIdIfAny() == InvalidTransactionId); + + /* + * A single backend should not execute multiple CLUSTER commands at a + * time, so use PID to make the slot unique. + */ + snprintf(NameStr(slotname), NAMEDATALEN, "cluster_%d", MyProcPid); + + /* + * Gather catalog information so that we can check later if the old + * relation has not changed while unlocked. + * + * Since this function also checks if the relation can be processed, + * it's important to call it before we setup the logical decoding, + * because that can take some time. Not sure if it's necessary to do + * it even earlier. + */ + cat_state = get_catalog_state(OldHeap); + + tupdesc = CreateTupleDescCopy(RelationGetDescr(OldHeap)); + + /* + * Unlock the relation (and possibly the clustering index) to avoid + * deadlock because setup_logical_decoding() will wait for all the + * running transactions (with XID assigned) to finish. Some of those + * transactions might be waiting for a lock on our relation. + */ + nrel = 0; + init_rel_reopen_info(&rri[nrel++], &OldHeap, InvalidOid, + LOCK_CLUSTER_CONCURRENT, + LOCK_CLUSTER_CONCURRENT); + if (index) + init_rel_reopen_info(&rri[nrel++], &index, InvalidOid, + LOCK_CLUSTER_CONCURRENT, + LOCK_CLUSTER_CONCURRENT); + unlock_and_close_relations(rri, nrel); + + /* Prepare to capture the concurrent data changes. */ + ctx = setup_logical_decoding(tableOid, NameStr(slotname), tupdesc); + + /* Lock the table (and index) again. */ + reopen_relations(rri, nrel); + + /* + * Check if a 'tupdesc' could have changed while the relation was + * unlocked. + */ + check_catalog_changes(OldHeap, cat_state); + + snapshot = SnapBuildInitialSnapshotForCluster(ctx->snapshot_builder); + } + if (OidIsValid(indexOid)) /* Mark the correct index as clustered */ mark_index_clustered(OldHeap, indexOid, true); /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; - is_system_catalog = IsSystemRelation(OldHeap); /* * Create the transient table that will receive the re-ordered data. @@ -673,31 +1092,52 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose) AccessExclusiveLock : NoLock); /* Copy the heap data into the new table in the desired order */ - copy_table_data(NewHeap, OldHeap, index, verbose, + copy_table_data(NewHeap, OldHeap, index, snapshot, ctx, verbose, &swap_toast_by_content, &frozenXid, &cutoffMulti); + if (concurrent) + { + rebuild_relation_finish_concurrent(NewHeap, OldHeap, index, + cat_state, ctx, + swap_toast_by_content, + frozenXid, cutoffMulti); + + pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE, + PROGRESS_CLUSTER_PHASE_FINAL_CLEANUP); + + /* Done with decoding. */ + FreeSnapshot(snapshot); + free_catalog_state(cat_state); + cleanup_logical_decoding(ctx); + ReplicationSlotRelease(); + ReplicationSlotDrop(NameStr(slotname), false); + } + else + { + bool is_system_catalog = IsSystemRelation(OldHeap); - /* Close relcache entries, but keep lock until transaction commit */ - table_close(OldHeap, NoLock); - if (index) - index_close(index, NoLock); + /* Close relcache entries, but keep lock until transaction commit */ + table_close(OldHeap, NoLock); + if (index) + index_close(index, NoLock); - /* - * Close the new relation so it can be dropped as soon as the storage is - * swapped. The relation is not visible to others, so we could unlock it - * completely, but it's simpler to pass NoLock than to track all the locks - * acquired so far. - */ - table_close(NewHeap, NoLock); + /* + * Close the new relation so it can be dropped as soon as the storage + * is swapped. The relation is not visible to others, so we could + * unlock it completely, but it's simpler to pass NoLock than to track + * all the lock acquired so far. + */ + table_close(NewHeap, NoLock); - /* - * Swap the physical files of the target and transient tables, then - * rebuild the target's indexes and throw away the transient table. - */ - finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, - swap_toast_by_content, false, true, - frozenXid, cutoffMulti, - relpersistence); + /* + * Swap the physical files of the target and transient tables, then + * rebuild the target's indexes and throw away the transient table. + */ + finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, + swap_toast_by_content, false, true, true, + frozenXid, cutoffMulti, + relpersistence); + } } @@ -848,15 +1288,19 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, /* * Do the physical copying of table data. * + * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass + * iff concurrent processing is required. + * * There are three output parameters: * *pSwapToastByContent is set true if toast tables must be swapped by content. * *pFreezeXid receives the TransactionId used as freeze cutoff point. * *pCutoffMulti receives the MultiXactId used as a cutoff point. */ static void -copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose, - bool *pSwapToastByContent, TransactionId *pFreezeXid, - MultiXactId *pCutoffMulti) +copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, + Snapshot snapshot, LogicalDecodingContext *decoding_ctx, + bool verbose, bool *pSwapToastByContent, + TransactionId *pFreezeXid, MultiXactId *pCutoffMulti) { Oid OIDOldHeap = RelationGetRelid(OldHeap); Oid OIDOldIndex = OldIndex ? RelationGetRelid(OldIndex) : InvalidOid; @@ -876,6 +1320,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb int elevel = verbose ? INFO : DEBUG2; PGRUsage ru0; char *nspname; + bool concurrent = snapshot != NULL; pg_rusage_init(&ru0); @@ -902,8 +1347,12 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * * We don't need to open the toast relation here, just lock it. The lock * will be held till end of transaction. + * + * In the CONCURRENT case, the lock does not help because we need to + * release it temporarily at some point. Instead, we expect VACUUM / + * CLUSTER to skip tables which are present in ClusteredRelsHash. */ - if (OldHeap->rd_rel->reltoastrelid) + if (OldHeap->rd_rel->reltoastrelid && !concurrent) LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock); /* @@ -979,7 +1428,45 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * provided, else plain seqscan. */ if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID) + { + ResourceOwner oldowner = CurrentResourceOwner; + + /* + * In the CONCURRENT case, do the planning in a subtrensaction so that + * we don't leave any additional locks behind us that we cannot + * release easily. + */ + if (concurrent) + { + Assert(CheckRelationLockedByMe(OldHeap, LOCK_CLUSTER_CONCURRENT, + false)); + Assert(CheckRelationLockedByMe(OldIndex, LOCK_CLUSTER_CONCURRENT, + false)); + BeginInternalSubTransaction("plan_cluster_use_sort"); + } + use_sort = plan_cluster_use_sort(OIDOldHeap, OIDOldIndex); + + if (concurrent) + { + PgBackendProgress progress; + + /* + * Command progress reporting gets terminated at subtransaction + * end. Save the status so it can be eventually restored. + */ + memcpy(&progress, &MyBEEntry->st_progress, + sizeof(PgBackendProgress)); + + /* Release the locks by aborting the subtransaction. */ + RollbackAndReleaseCurrentSubTransaction(); + + /* Restore the progress reporting status. */ + pgstat_progress_restore_state(&progress); + + CurrentResourceOwner = oldowner; + } + } else use_sort = false; @@ -1008,7 +1495,9 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * values (e.g. because the AM doesn't use freezing). */ table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort, - cutoffs.OldestXmin, &cutoffs.FreezeLimit, + cutoffs.OldestXmin, snapshot, + decoding_ctx, + &cutoffs.FreezeLimit, &cutoffs.MultiXactCutoff, &num_tuples, &tups_vacuumed, &tups_recently_dead); @@ -1017,7 +1506,11 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb *pFreezeXid = cutoffs.FreezeLimit; *pCutoffMulti = cutoffs.MultiXactCutoff; - /* Reset rd_toastoid just to be tidy --- it shouldn't be looked at again */ + /* + * Reset rd_toastoid just to be tidy --- it shouldn't be looked at + * again. In the CONCURRENT case, we need to set it again before applying + * the concurrent changes. + */ NewHeap->rd_toastoid = InvalidOid; num_pages = RelationGetNumberOfBlocks(NewHeap); @@ -1468,14 +1961,13 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool swap_toast_by_content, bool check_constraints, bool is_internal, + bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence) { ObjectAddress object; Oid mapped_tables[4]; - int reindex_flags; - ReindexParams reindex_params = {0}; int i; /* Report that we are now swapping relation files */ @@ -1501,39 +1993,46 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, if (is_system_catalog) CacheInvalidateCatalog(OIDOldHeap); - /* - * Rebuild each index on the relation (but not the toast table, which is - * all-new at this point). It is important to do this before the DROP - * step because if we are processing a system catalog that will be used - * during DROP, we want to have its indexes available. There is no - * advantage to the other order anyway because this is all transactional, - * so no chance to reclaim disk space before commit. We do not need a - * final CommandCounterIncrement() because reindex_relation does it. - * - * Note: because index_build is called via reindex_relation, it will never - * set indcheckxmin true for the indexes. This is OK even though in some - * sense we are building new indexes rather than rebuilding existing ones, - * because the new heap won't contain any HOT chains at all, let alone - * broken ones, so it can't be necessary to set indcheckxmin. - */ - reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE; - if (check_constraints) - reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS; + if (reindex) + { + int reindex_flags; + ReindexParams reindex_params = {0}; - /* - * Ensure that the indexes have the same persistence as the parent - * relation. - */ - if (newrelpersistence == RELPERSISTENCE_UNLOGGED) - reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED; - else if (newrelpersistence == RELPERSISTENCE_PERMANENT) - reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT; + /* + * Rebuild each index on the relation (but not the toast table, which + * is all-new at this point). It is important to do this before the + * DROP step because if we are processing a system catalog that will + * be used during DROP, we want to have its indexes available. There + * is no advantage to the other order anyway because this is all + * transactional, so no chance to reclaim disk space before commit. + * We do not need a final CommandCounterIncrement() because + * reindex_relation does it. + * + * Note: because index_build is called via reindex_relation, it will never + * set indcheckxmin true for the indexes. This is OK even though in some + * sense we are building new indexes rather than rebuilding existing ones, + * because the new heap won't contain any HOT chains at all, let alone + * broken ones, so it can't be necessary to set indcheckxmin. + */ + reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE; + if (check_constraints) + reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS; - /* Report that we are now reindexing relations */ - pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE, - PROGRESS_CLUSTER_PHASE_REBUILD_INDEX); + /* + * Ensure that the indexes have the same persistence as the parent + * relation. + */ + if (newrelpersistence == RELPERSISTENCE_UNLOGGED) + reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED; + else if (newrelpersistence == RELPERSISTENCE_PERMANENT) + reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT; - reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params); + /* Report that we are now reindexing relations */ + pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE, + PROGRESS_CLUSTER_PHASE_REBUILD_INDEX); + + reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params); + } /* Report that we are now doing clean up */ pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE, @@ -1773,3 +2272,1938 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid) get_rel_name(relid)))); return false; } + +#define REPL_PLUGIN_NAME "pgoutput_cluster" + +/* + * Each relation being processed by CLUSTER CONCURRENTLY must be in the + * clusteredRels hashtable. + */ +typedef struct ClusteredRel +{ + Oid relid; + Oid dbid; +} ClusteredRel; + +static HTAB *ClusteredRelsHash = NULL; + +/* Maximum number of entries in the hashtable. */ +static int maxClusteredRels = 0; + +Size +ClusterShmemSize(void) +{ + /* + * A replication slot is needed for the processing, so use this GUC to + * allocate memory for the hashtable. Reserve also space for TOAST + * relations. + */ + maxClusteredRels = max_replication_slots * 2; + + return hash_estimate_size(maxClusteredRels, sizeof(ClusteredRel)); +} + +void +ClusterShmemInit(void) +{ + HASHCTL info; + + info.keysize = sizeof(ClusteredRel); + info.entrysize = info.keysize; + + ClusteredRelsHash = ShmemInitHash("Clustered Relations", + maxClusteredRels, + maxClusteredRels, + &info, + HASH_ELEM | HASH_BLOBS); +} + +/* + * Perform a preliminary check whether CLUSTER / VACUUM FULL CONCURRENTLY is + * possible. Note that here we only check things that should not change if we + * release the relation lock temporarily. The information that can change due + * to unlocking is checked in get_catalog_state(). + */ +static void +check_concurrent_cluster_requirements(Relation rel, bool isTopLevel, + bool isCluster) +{ + const char *stmt; + + if (isCluster) + stmt = "CLUSTER (CONCURRENTLY)"; + else + stmt = "VACUUM (FULL, CONCURRENTLY)"; + + /* + * Make sure we have no XID assigned, otherwise call of + * setup_logical_decoding() can cause a deadlock. + */ + PreventInTransactionBlock(isTopLevel, stmt); + + CheckSlotPermissions(); + + /* + * Use an existing function to check if we can use logical + * decoding. However note that RecoveryInProgress() should already have + * caused error, as it does for the non-concurrent VACUUM FULL / CLUSTER. + */ + CheckLogicalDecodingRequirements(); + + /* See ClusterShmemSize() */ + if (max_replication_slots < 2) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + (errmsg("%s requires \"max_replication_slots\" to be at least 2", + stmt))); +} + +/* + * Call this function before CLUSTER CONCURRENTLY starts to setup logical + * decoding. It makes sure that other users of the table put enough + * information into WAL. + * + * The point is that on various places we expect that the table we're + * processing is treated like a system catalog. For example, we need to be + * able to scan it using a "historic snapshot" anytime during the processing + * (as opposed to scanning only at the start point of the decoding, logical + * replication does during initial table synchronization), in order to apply + * concurrent UPDATE / DELETE commands. + * + * Since we need to close and reopen the relation here, the 'rel_p' and + * 'index_p' arguments are in/out. + * + * 'enter_p' receives a bool value telling whether relation OID was entered + * into the hashtable or not. + */ +static void +begin_concurrent_cluster(Relation *rel_p, Relation *index_p, + bool *entered_p) +{ + Relation rel = *rel_p; + Oid relid, toastrelid; + ClusteredRel key, *entry; + bool found; + RelReopenInfo rri[2]; + int nrel; + static bool before_shmem_exit_callback_setup = false; + + relid = RelationGetRelid(rel); + + /* + * Make sure that we do not leave an entry in ClusteredRelsHash if exiting + * due to FATAL. + */ + if (!before_shmem_exit_callback_setup) + { + before_shmem_exit(cluster_before_shmem_exit_callback, 0); + before_shmem_exit_callback_setup = true; + } + + memset(&key, 0, sizeof(key)); + key.relid = relid; + key.dbid = MyDatabaseId; + + *entered_p = false; + LWLockAcquire(ClusteredRelsLock, LW_EXCLUSIVE); + entry = (ClusteredRel *) + hash_search(ClusteredRelsHash, &key, HASH_ENTER_NULL, &found); + if (found) + { + /* + * Since CLUSTER CONCURRENTLY takes ShareRowExclusiveLock, a conflict + * should occur much earlier. However that lock may be released + * temporarily, see below. Anyway, we should complain whatever the + * reason of the conflict might be. + */ + ereport(ERROR, + (errmsg(CLUSTER_IN_PROGRESS_MESSAGE, + RelationGetRelationName(rel)))); + } + if (entry == NULL) + ereport(ERROR, + (errmsg("too many requests for CLUSTER CONCURRENTLY at a time")), + (errhint("consider increasing the \"max_replication_slots\" configuration parameter"))); + + /* + * Even if the insertion of TOAST relid should fail below, the caller has + * to do cleanup. + */ + *entered_p = true; + + /* + * Enable the callback to remove the entry in case of exit. We should not + * do this earlier, otherwise an attempt to insert already existing entry + * could make us remove that entry (inserted by another backend) during + * ERROR handling. + */ + Assert(!OidIsValid(clustered_rel)); + clustered_rel = relid; + + /* + * TOAST relation is not accessed using historic snapshot, but we enter it + * here to protect it from being VACUUMed by another backend. (Lock does + * not help in the CONCURRENT case because cannot hold it continuously + * till the end of the transaction.) See the comments on locking TOAST + * relation in copy_table_data(). + */ + toastrelid = rel->rd_rel->reltoastrelid; + if (OidIsValid(toastrelid)) + { + key.relid = toastrelid; + entry = (ClusteredRel *) + hash_search(ClusteredRelsHash, &key, HASH_ENTER_NULL, &found); + if (found) + /* + * If we could enter the main fork the TOAST should succeed + * too. Nevertheless, check. + */ + ereport(ERROR, + (errmsg("TOAST relation of \"%s\" is already being processed by CLUSTER CONCURRENTLY", + RelationGetRelationName(rel)))); + if (entry == NULL) + ereport(ERROR, + (errmsg("too many requests for CLUSTER CONCURRENT at a time")), + (errhint("consider increasing the \"max_replication_slots\" configuration parameter"))); + + Assert(!OidIsValid(clustered_rel_toast)); + clustered_rel_toast = toastrelid; + } + LWLockRelease(ClusteredRelsLock); + + /* + * Make sure that other backends are aware of the new hash entry. + * + * Besides sending the invalidation message, we need to force re-opening + * of the relation, which includes the actual invalidation (and thus + * checking of our hashtable on the next access). + */ + CacheInvalidateRelcacheImmediate(rel); + /* + * Since the hashtable only needs to be checked by write transactions, + * lock the relation in a mode that conflicts with any DML command. (The + * reading transactions are supposed to close the relation before opening + * it with higher lock.) Once we have the relation (and its index) locked, + * we unlock it immediately and then re-lock using the original mode. + */ + nrel = 0; + init_rel_reopen_info(&rri[nrel++], rel_p, InvalidOid, + LOCK_CLUSTER_CONCURRENT, ShareLock); + if (index_p) + { + /* + * Another transaction might want to open both the relation and the + * index. If it already has the relation lock and is waiting for the + * index lock, we should release the index lock, otherwise our request + * for ShareLock on the relation can end up in a deadlock. + */ + init_rel_reopen_info(&rri[nrel++], index_p, InvalidOid, + LOCK_CLUSTER_CONCURRENT, ShareLock); + } + unlock_and_close_relations(rri, nrel); + /* + * XXX It's not strictly necessary to lock the index here, but it's + * probably not worth teaching the "reopen API" about this special case. + */ + reopen_relations(rri, nrel); + + /* Switch back to the original lock. */ + nrel = 0; + init_rel_reopen_info(&rri[nrel++], rel_p, InvalidOid, + ShareLock, LOCK_CLUSTER_CONCURRENT); + if (index_p) + init_rel_reopen_info(&rri[nrel++], index_p, InvalidOid, + ShareLock, LOCK_CLUSTER_CONCURRENT); + unlock_and_close_relations(rri, nrel); + reopen_relations(rri, nrel); +} + +/* + * Call this when done with CLUSTER CONCURRENTLY. + * + * 'error' tells whether the function is being called in order to handle + * error. + */ +static void +end_concurrent_cluster(Oid relid, bool error) +{ + ClusteredRel key, *entry, *entry_toast = NULL; + + /* Remove the relation from the hash. */ + memset(&key, 0, sizeof(key)); + key.relid = relid; + key.dbid = MyDatabaseId; + LWLockAcquire(ClusteredRelsLock, LW_EXCLUSIVE); + entry = hash_search(ClusteredRelsHash, &key, HASH_REMOVE, NULL); + + /* Disable end_concurrent_cluster_on_exit_callback(). */ + if (OidIsValid(clustered_rel)) + clustered_rel = InvalidOid; + + /* Remove the TOAST relation if there is one. */ + if (OidIsValid(clustered_rel_toast)) + { + key.relid = clustered_rel_toast; + entry_toast = hash_search(ClusteredRelsHash, &key, HASH_REMOVE, + NULL); + + clustered_rel_toast = InvalidOid; + } + else + key.relid = InvalidOid; + LWLockRelease(ClusteredRelsLock); + + /* + * On normal completion (!error), we should not really fail to remove the + * entry. But if it did for any reason, make sure the transaction is + * aborted: if other transactions, while changing the contents of the + * relation, didn't know that CLUSTER CONCURRENTLY was in progress, they + * could have missed to WAL enough information, and thus we could have + * produced an inconsistent table contents. + * + * On the other hand, if we are already handling an error, there's no + * reason to worry about inconsistent contents of the new storage because + * the transaction is going to be rolled back anyway. Furthermore, by + * raising ERROR here we'd shadow the original error. + */ + if (!error) + { + char *relname; + + if (entry == NULL) + { + relname = get_rel_name(relid); + if (!relname) + ereport(ERROR, + (errmsg("cache lookup failed for relation %u", + relid))); + + ereport(ERROR, + (errmsg("relation \"%s\" not found among clustered relations", + relname))); + } + + /* + * Missing TOAST relation indicates that it could have been VACUUMed + * or CLUSTERed by another backend while we did not hold a lock on it. + */ + if (entry_toast == NULL && OidIsValid(key.relid)) + { + relname = get_rel_name(key.relid); + if (!relname) + ereport(ERROR, + (errmsg("cache lookup failed for relation %u", + key.relid))); + + ereport(ERROR, + (errmsg("relation \"%s\" not found among clustered relations", + relname))); + } + } + + /* + * Note: unlike begin_concurrent_cluster(), here we do not lock/unlock the + * relation: 1) On normal completion, the caller is already holding + * AccessExclusiveLock (till the end of the transaction), 2) on ERROR / + * FATAL, we try to do the cleanup asap, but the worst case is that other + * backends will write unnecessary information to WAL until they close the + * relation. + */ +} + +/* + * A wrapper to call end_concurrent_cluster() as a before_shmem_exit callback. + */ +static void +cluster_before_shmem_exit_callback(int code, Datum arg) +{ + if (OidIsValid(clustered_rel) || OidIsValid(clustered_rel_toast)) + end_concurrent_cluster(clustered_rel, true); +} + +/* + * Check if relation is currently being processed by CLUSTER CONCURRENTLY. + */ +bool +is_concurrent_cluster_in_progress(Oid relid) +{ + ClusteredRel key, *entry; + + memset(&key, 0, sizeof(key)); + key.relid = relid; + key.dbid = MyDatabaseId; + + LWLockAcquire(ClusteredRelsLock, LW_SHARED); + entry = (ClusteredRel *) + hash_search(ClusteredRelsHash, &key, HASH_FIND, NULL); + LWLockRelease(ClusteredRelsLock); + + return entry != NULL; +} + +/* + * Check if VACUUM FULL / CLUSTER CONCURRENTLY is already running for given + * relation, and if so, raise ERROR. The problem is that cluster_rel() needs + * to release its lock on the relation temporarily at some point, so our lock + * alone does not help. Commands that might break what cluster_rel() is doing + * should call this function first. + * + * Return without checking if lockmode allows for race conditions which would + * make the result meaningless. In that case, cluster_rel() itself should + * throw ERROR if the relation was changed by us in an incompatible + * way. However, if it managed to do most of its work by then, a lot of CPU + * time might be wasted. + */ +void +check_for_concurrent_cluster(Oid relid, LOCKMODE lockmode) +{ + /* + * If the caller does not have a lock that conflicts with + * LOCK_CLUSTER_CONCURRENT, the check makes little sense because the + * VACUUM FULL / CLUSTER CONCURRENTLY can start anytime after the check. + */ + if (lockmode < LOCK_CLUSTER_CONCURRENT) + return; + + if (is_concurrent_cluster_in_progress(relid)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg(CLUSTER_IN_PROGRESS_MESSAGE, + get_rel_name(relid)))); + +} + +/* + * Check if relation is eligible for CLUSTER CONCURRENTLY and retrieve the + * catalog state to be passed later to check_catalog_changes. + * + * Caller is supposed to hold (at least) LOCK_CLUSTER_CONCURRENT on the + * relation. + */ +static CatalogState * +get_catalog_state(Relation rel) +{ + CatalogState *result = palloc_object(CatalogState); + List *ind_oids; + ListCell *lc; + int ninds, i; + Oid reltoastrelid = rel->rd_rel->reltoastrelid; + char relpersistence = rel->rd_rel->relpersistence; + char replident = rel->rd_rel->relreplident; + Oid ident_idx = RelationGetReplicaIndex(rel); + TupleDesc td_src = RelationGetDescr(rel); + + /* + * While gathering the catalog information, check if there is a reason not + * to proceed. + */ + check_relation_is_clusterable_concurrently(rel, ERROR, + "CLUSTER (CONCURRENTLY)"); + + /* + * TOAST should not really change, but be careful. If it did, we would be + * unable to remove the new one from ClusteredRelsHash. + */ + if (OidIsValid(clustered_rel_toast) && + clustered_rel_toast != reltoastrelid) + ereport(ERROR, + (errmsg("TOAST relation changed by another transaction"))); + + /* No index should be dropped while we are checking it. */ + Assert(CheckRelationLockedByMe(rel, ShareUpdateExclusiveLock, true)); + + ind_oids = RelationGetIndexList(rel); + result->ninds = ninds = list_length(ind_oids); + result->ind_oids = palloc_array(Oid, ninds); + result->ind_tupdescs = palloc_array(TupleDesc, ninds); + i = 0; + foreach(lc, ind_oids) + { + Oid ind_oid = lfirst_oid(lc); + Relation index; + TupleDesc td_src, td_dst; + + /* + * Weaker lock should be o.k. for the index, but this one should break + * anything either. + */ + index = index_open(ind_oid, ShareUpdateExclusiveLock); + + result->ind_oids[i] = RelationGetRelid(index); + td_src = RelationGetDescr(index); + td_dst = palloc(TupleDescSize(td_src)); + TupleDescCopy(td_dst, td_src); + result->ind_tupdescs[i] = td_dst; + i++; + + index_close(index, ShareUpdateExclusiveLock); + } + + /* Fill-in the relation info. */ + result->tupdesc = palloc(TupleDescSize(td_src)); + TupleDescCopy(result->tupdesc, td_src); + result->reltoastrelid = reltoastrelid; + result->relpersistence = relpersistence; + result->replident = replident; + result->replidindex = ident_idx; + + return result; +} + +static void +free_catalog_state(CatalogState *state) +{ + /* We are only interested in indexes. */ + if (state->ninds == 0) + return; + + for (int i = 0; i < state->ninds; i++) + FreeTupleDesc(state->ind_tupdescs[i]); + + FreeTupleDesc(state->tupdesc); + pfree(state->ind_oids); + pfree(state->ind_tupdescs); + pfree(state); +} + +/* + * Raise ERROR if 'rel' changed in a way that does not allow further + * processing of CLUSTER CONCURRENTLY. + * + * Besides the relation's tuple descriptor, it's important to check indexes: + * concurrent change of index definition (can it happen in other way than + * dropping and re-creating the index, accidentally with the same OID?) can be + * a problem because we may already have the new index built. If an index was + * created or dropped concurrently, we'd fail to swap the index storage. In + * any case, we prefer to check the indexes early to get an explicit error + * message about the mismatch. Furthermore, the earlier we detect the change, + * the fewer CPU cycles we waste. + * + * Note that we do not check constraints because the transaction which changed + * them must have ensured that the existing tuples satisfy the new + * constraints. If any DML commands were necessary for that, we will simply + * decode them from WAL and apply them to the new storage. + * + * Caller is supposed to hold (at least) ShareUpdateExclusiveLock on the + * relation. + */ +static void +check_catalog_changes(Relation rel, CatalogState *cat_state) +{ + List *ind_oids; + ListCell *lc; + LOCKMODE lmode; + Oid ident_idx; + TupleDesc td, td_cp; + + /* First, check the relation info. */ + + /* TOAST is not easy to change, but check. */ + if (rel->rd_rel->reltoastrelid != cat_state->reltoastrelid) + ereport(ERROR, + errmsg("TOAST relation of relation \"%s\" changed by another transaction", + RelationGetRelationName(rel))); + + if (rel->rd_rel->relpersistence != cat_state->relpersistence) + ereport(ERROR, + errmsg("persistence of relation \"%s\" changed by another transaction", + RelationGetRelationName(rel))); + + if (cat_state->replident != rel->rd_rel->relreplident) + ereport(ERROR, + errmsg("replica identity of relation \"%s\" changed by another transaction", + RelationGetRelationName(rel))); + + ident_idx = RelationGetReplicaIndex(rel); + if (ident_idx == InvalidOid && rel->rd_pkindex != InvalidOid) + ident_idx = rel->rd_pkindex; + if (cat_state->replidindex != ident_idx) + ereport(ERROR, + errmsg("identity index of relation \"%s\" changed by another transaction", + RelationGetRelationName(rel))); + + /* + * As cat_state contains a copy (which has the constraint info cleared), + * create a temporary copy for the comparison. + */ + td = RelationGetDescr(rel); + td_cp = palloc(TupleDescSize(td)); + TupleDescCopy(td_cp, td); + if (!equalTupleDescs(cat_state->tupdesc, td_cp)) + ereport(ERROR, + errmsg("definition of relation \"%s\" changed by another transaction", + RelationGetRelationName(rel))); + FreeTupleDesc(td_cp); + + /* Now we are only interested in indexes. */ + if (cat_state->ninds == 0) + return; + + /* No index should be dropped while we are checking the relation. */ + lmode = ShareUpdateExclusiveLock; + Assert(CheckRelationLockedByMe(rel, lmode, true)); + + ind_oids = RelationGetIndexList(rel); + if (list_length(ind_oids) != cat_state->ninds) + goto failed_index; + + foreach(lc, ind_oids) + { + Oid ind_oid = lfirst_oid(lc); + int i; + TupleDesc tupdesc; + Relation index; + + /* Find the index in cat_state. */ + for (i = 0; i < cat_state->ninds; i++) + { + if (cat_state->ind_oids[i] == ind_oid) + break; + } + /* + * OID not found, i.e. the index was replaced by another one. XXX + * Should we yet try to find if an index having the desired tuple + * descriptor exists? Or should we always look for the tuple + * descriptor and not use OIDs at all? + */ + if (i == cat_state->ninds) + goto failed_index; + + /* Check the tuple descriptor. */ + index = try_index_open(ind_oid, lmode); + if (index == NULL) + goto failed_index; + tupdesc = RelationGetDescr(index); + if (!equalTupleDescs(cat_state->ind_tupdescs[i], tupdesc)) + goto failed_index; + index_close(index, lmode); + } + + return; + +failed_index: + ereport(ERROR, + (errmsg("index(es) of relation \"%s\" changed by another transaction", + RelationGetRelationName(rel)))); +} + +/* + * This function is much like pg_create_logical_replication_slot() except that + * the new slot is neither released (if anyone else could read changes from + * our slot, we could miss changes other backends do while we copy the + * existing data into temporary table), nor persisted (it's easier to handle + * crash by restarting all the work from scratch). + * + * XXX Even though CreateInitDecodingContext() does not set state to + * RS_PERSISTENT, it does write the slot to disk. We rely on + * RestoreSlotFromDisk() to delete ephemeral slots during startup. (Both ERROR + * and FATAL should lead to cleanup even before the cluster goes down.) + */ +static LogicalDecodingContext * +setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc) +{ + LogicalDecodingContext *ctx; + ClusterDecodingState *dstate; + + /* RS_TEMPORARY so that the slot gets cleaned up on ERROR. */ + ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, false, false); + + /* + * Neither prepare_write nor do_write callback nor update_progress is + * useful for us. + * + * Regarding the value of need_full_snapshot, we pass false because the + * table we are processing is present in ClusteredRelsHash and therefore, + * regarding logical decoding, treated like a catalog. + */ + ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME, + NIL, + false, + InvalidXLogRecPtr, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); + + /* + * We don't have control on setting fast_forward, so at least check it. + */ + Assert(!ctx->fast_forward); + + DecodingContextFindStartpoint(ctx); + + /* Some WAL records should have been read. */ + Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr); + + XLByteToSeg(ctx->reader->EndRecPtr, cluster_current_segment, + wal_segment_size); + + /* + * Setup structures to store decoded changes. + */ + dstate = palloc0(sizeof(ClusterDecodingState)); + dstate->relid = relid; + dstate->tstore = tuplestore_begin_heap(false, false, + maintenance_work_mem); + dstate->tupdesc = tupdesc; + + /* Initialize the descriptor to store the changes ... */ + dstate->tupdesc_change = CreateTemplateTupleDesc(1); + + TupleDescInitEntry(dstate->tupdesc_change, 1, NULL, BYTEAOID, -1, 0); + /* ... as well as the corresponding slot. */ + dstate->tsslot = MakeSingleTupleTableSlot(dstate->tupdesc_change, + &TTSOpsMinimalTuple); + + dstate->resowner = ResourceOwnerCreate(CurrentResourceOwner, + "logical decoding"); + + ctx->output_writer_private = dstate; + return ctx; +} + +/* + * Retrieve tuple from a change structure. As for the change, no alignment is + * assumed. + */ +static HeapTuple +get_changed_tuple(ConcurrentChange *change) +{ + HeapTupleData tup_data; + HeapTuple result; + char *src; + + /* + * Ensure alignment before accessing the fields. (This is why we can't use + * heap_copytuple() instead of this function.) + */ + memcpy(&tup_data, &change->tup_data, sizeof(HeapTupleData)); + + result = (HeapTuple) palloc(HEAPTUPLESIZE + tup_data.t_len); + memcpy(result, &tup_data, sizeof(HeapTupleData)); + result->t_data = (HeapTupleHeader) ((char *) result + HEAPTUPLESIZE); + src = (char *) change + sizeof(ConcurrentChange); + memcpy(result->t_data, src, result->t_len); + + return result; +} + +/* + * Decode logical changes from the WAL sequence up to end_of_wal. + */ +void +cluster_decode_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal) +{ + ClusterDecodingState *dstate; + ResourceOwner resowner_old; + PgBackendProgress progress; + + /* + * Invalidate the "present" cache before moving to "(recent) history". + */ + InvalidateSystemCaches(); + + dstate = (ClusterDecodingState *) ctx->output_writer_private; + resowner_old = CurrentResourceOwner; + CurrentResourceOwner = dstate->resowner; + + /* + * reorderbuffer.c uses internal subtransaction, whose abort ends the + * command progress reporting. Save the status here so we can restore when + * done with the decoding. + */ + memcpy(&progress, &MyBEEntry->st_progress, sizeof(PgBackendProgress)); + + PG_TRY(); + { + while (ctx->reader->EndRecPtr < end_of_wal) + { + XLogRecord *record; + XLogSegNo segno_new; + char *errm = NULL; + XLogRecPtr end_lsn; + + record = XLogReadRecord(ctx->reader, &errm); + if (errm) + elog(ERROR, "%s", errm); + + if (record != NULL) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + /* + * If WAL segment boundary has been crossed, inform the decoding + * system that the catalog_xmin can advance. (We can confirm more + * often, but a filling a single WAL segment should not take much + * time.) + */ + end_lsn = ctx->reader->EndRecPtr; + XLByteToSeg(end_lsn, segno_new, wal_segment_size); + if (segno_new != cluster_current_segment) + { + LogicalConfirmReceivedLocation(end_lsn); + elog(DEBUG1, "cluster: confirmed receive location %X/%X", + (uint32) (end_lsn >> 32), (uint32) end_lsn); + cluster_current_segment = segno_new; + } + + CHECK_FOR_INTERRUPTS(); + } + InvalidateSystemCaches(); + CurrentResourceOwner = resowner_old; + } + PG_CATCH(); + { + InvalidateSystemCaches(); + CurrentResourceOwner = resowner_old; + PG_RE_THROW(); + } + PG_END_TRY(); + + /* Restore the progress reporting status. */ + pgstat_progress_restore_state(&progress); +} + +/* + * Apply changes that happened during the initial load. + * + * Scan key is passed by caller, so it does not have to be constructed + * multiple times. Key entries have all fields initialized, except for + * sk_argument. + */ +static void +apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel, + ScanKey key, int nkeys, IndexInsertState *iistate) +{ + TupleTableSlot *index_slot, *ident_slot; + HeapTuple tup_old = NULL; + + if (dstate->nchanges == 0) + return; + + /* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */ + index_slot = MakeSingleTupleTableSlot(dstate->tupdesc, &TTSOpsHeapTuple); + iistate->econtext->ecxt_scantuple = index_slot; + + /* A slot to fetch tuples from identity index. */ + ident_slot = table_slot_create(rel, NULL); + + while (tuplestore_gettupleslot(dstate->tstore, true, false, + dstate->tsslot)) + { + bool shouldFree; + HeapTuple tup_change, + tup, + tup_exist; + char *change_raw; + ConcurrentChange *change; + Snapshot snapshot; + bool isnull[1]; + Datum values[1]; + + CHECK_FOR_INTERRUPTS(); + + /* Get the change from the single-column tuple. */ + tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree); + heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull); + Assert(!isnull[0]); + + /* This is bytea, but char* is easier to work with. */ + change_raw = (char *) DatumGetByteaP(values[0]); + + change = (ConcurrentChange *) VARDATA(change_raw); + + /* TRUNCATE change contains no tuple, so process it separately. */ + if (change->kind == CHANGE_TRUNCATE) + { + /* + * All the things that ExecuteTruncateGuts() does (such as firing + * triggers or handling the DROP_CASCADE behavior) should have + * taken place on the source relation. Thus we only do the actual + * truncation of the new relation (and its indexes). + */ + heap_truncate_one_rel(rel); + + pfree(tup_change); + continue; + } + + /* + * Extract the tuple from the change. The tuple is copied here because + * it might be assigned to 'tup_old', in which case it needs to + * survive into the next iteration. + */ + tup = get_changed_tuple(change); + + if (change->kind == CHANGE_UPDATE_OLD) + { + Assert(tup_old == NULL); + tup_old = tup; + } + else if (change->kind == CHANGE_INSERT) + { + Assert(tup_old == NULL); + + apply_concurrent_insert(rel, change, tup, iistate, index_slot); + + pfree(tup); + } + else if (change->kind == CHANGE_UPDATE_NEW || + change->kind == CHANGE_DELETE) + { + IndexScanDesc ind_scan = NULL; + HeapTuple tup_key; + + if (change->kind == CHANGE_UPDATE_NEW) + { + tup_key = tup_old != NULL ? tup_old : tup; + } + else + { + Assert(tup_old == NULL); + tup_key = tup; + } + + /* + * Find the tuple to be updated or deleted. + * + * As the table being CLUSTERed concurrently is considered an + * "user catalog", new CID is WAL-logged and decoded. And since we + * use the same XID that the original DMLs did, the snapshot used + * for the logical decoding (by now converted to a non-historic + * MVCC snapshot) should see the tuples inserted previously into + * the new heap and/or updated there. + */ + snapshot = change->snapshot; + + /* + * Set what should be considered current transaction (and + * subtransactions) during visibility check. + * + * Note that this snapshot was created from a historic snapshot + * using SnapBuildMVCCFromHistoric(), which does not touch + * 'subxip'. Thus, unlike in a regular MVCC snapshot, the array + * does not contain (sub)transactions other than the one whose + * data changes we are applying. + */ + SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt); + + tup_exist = find_target_tuple(rel, key, nkeys, tup_key, snapshot, + iistate, ident_slot, &ind_scan); + if (tup_exist == NULL) + elog(ERROR, "Failed to find target tuple"); + + if (change->kind == CHANGE_UPDATE_NEW) + apply_concurrent_update(rel, tup, tup_exist, change, iistate, + index_slot); + else + apply_concurrent_delete(rel, tup_exist, change); + + ResetClusterCurrentXids(); + + if (tup_old != NULL) + { + pfree(tup_old); + tup_old = NULL; + } + + pfree(tup); + index_endscan(ind_scan); + } + else + elog(ERROR, "Unrecognized kind of change: %d", change->kind); + + /* Free the snapshot if this is the last change that needed it. */ + Assert(change->snapshot->active_count > 0); + change->snapshot->active_count--; + if (change->snapshot->active_count == 0) + { + if (change->snapshot == dstate->snapshot) + dstate->snapshot = NULL; + FreeSnapshot(change->snapshot); + } + + /* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */ + Assert(shouldFree); + pfree(tup_change); + } + + tuplestore_clear(dstate->tstore); + dstate->nchanges = 0; + + /* Cleanup. */ + ExecDropSingleTupleTableSlot(index_slot); + ExecDropSingleTupleTableSlot(ident_slot); +} + +static void +apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, + IndexInsertState *iistate, TupleTableSlot *index_slot) +{ + Snapshot snapshot = change->snapshot; + List *recheck; + + /* + * For INSERT, the visibility information is not important, but we use the + * snapshot to get CID. Index functions might need the whole snapshot + * anyway. + */ + SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt); + + /* + * Write the tuple into the new heap. + * + * The snapshot is the one we used to decode the insert (though converted + * to "non-historic" MVCC snapshot), i.e. the snapshot's curcid is the + * tuple CID incremented by one (due to the "new CID" WAL record that got + * written along with the INSERT record). Thus if we want to use the + * original CID, we need to subtract 1 from curcid. + */ + Assert(snapshot->curcid != InvalidCommandId && + snapshot->curcid > FirstCommandId); + + heap_insert(rel, tup, change->xid, snapshot->curcid - 1, + HEAP_INSERT_NO_LOGICAL, NULL); + + /* + * Update indexes. + * + * In case functions in the index need the active snapshot and caller + * hasn't set one. + */ + PushActiveSnapshot(snapshot); + ExecStoreHeapTuple(tup, index_slot, false); + recheck = ExecInsertIndexTuples(iistate->rri, + index_slot, + iistate->estate, + false, /* update */ + false, /* noDupErr */ + NULL, /* specConflict */ + NIL, /* arbiterIndexes */ + false /* onlySummarizing */ + ); + PopActiveSnapshot(); + ResetClusterCurrentXids(); + + /* + * If recheck is required, it must have been preformed on the source + * relation by now. (All the logical changes we process here are already + * committed.) + */ + list_free(recheck); + + pgstat_progress_incr_param(PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED, 1); +} + +static void +apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, + ConcurrentChange *change, IndexInsertState *iistate, + TupleTableSlot *index_slot) +{ + List *recheck; + LockTupleMode lockmode; + TU_UpdateIndexes update_indexes; + ItemPointerData tid_old_new_heap; + TM_Result res; + Snapshot snapshot = change->snapshot; + TM_FailureData tmfd; + + /* Location of the existing tuple in the new heap. */ + ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap); + + /* + * Write the new tuple into the new heap. ('tup' gets the TID assigned + * here.) + * + * Regarding CID, see the comment in apply_concurrent_insert(). + */ + Assert(snapshot->curcid != InvalidCommandId && + snapshot->curcid > FirstCommandId); + + res = heap_update(rel, &tid_old_new_heap, tup, + change->xid, snapshot->curcid - 1, + InvalidSnapshot, + false, /* no wait - only we are doing changes */ + &tmfd, &lockmode, &update_indexes, + /* wal_logical */ + false); + if (res != TM_Ok) + ereport(ERROR, (errmsg("failed to apply concurrent UPDATE"))); + + ExecStoreHeapTuple(tup, index_slot, false); + + if (update_indexes != TU_None) + { + PushActiveSnapshot(snapshot); + recheck = ExecInsertIndexTuples(iistate->rri, + index_slot, + iistate->estate, + true, /* update */ + false, /* noDupErr */ + NULL, /* specConflict */ + NIL, /* arbiterIndexes */ + /* onlySummarizing */ + update_indexes == TU_Summarizing); + PopActiveSnapshot(); + list_free(recheck); + } + + pgstat_progress_incr_param(PROGRESS_CLUSTER_HEAP_TUPLES_UPDATED, 1); +} + +static void +apply_concurrent_delete(Relation rel, HeapTuple tup_target, + ConcurrentChange *change) +{ + ItemPointerData tid_old_new_heap; + TM_Result res; + TM_FailureData tmfd; + Snapshot snapshot = change->snapshot; + + /* Regarding CID, see the comment in apply_concurrent_insert(). */ + Assert(snapshot->curcid != InvalidCommandId && + snapshot->curcid > FirstCommandId); + + /* Location of the existing tuple in the new heap. */ + ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap); + + res = heap_delete(rel, &tid_old_new_heap, change->xid, + snapshot->curcid - 1, InvalidSnapshot, false, + &tmfd, false, + /* wal_logical */ + false); + + if (res != TM_Ok) + ereport(ERROR, (errmsg("failed to apply concurrent DELETE"))); + + pgstat_progress_incr_param(PROGRESS_CLUSTER_HEAP_TUPLES_DELETED, 1); +} + +/* + * Find the tuple to be updated or deleted. + * + * 'key' is a pre-initialized scan key, into which the function will put the + * key values. + * + * 'tup_key' is a tuple containing the key values for the scan. + * + * On exit,'*scan_p' contains the scan descriptor used. The caller must close + * it when he no longer needs the tuple returned. + */ +static HeapTuple +find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, + Snapshot snapshot, IndexInsertState *iistate, + TupleTableSlot *ident_slot, IndexScanDesc *scan_p) +{ + IndexScanDesc scan; + Form_pg_index ident_form; + int2vector *ident_indkey; + HeapTuple result = NULL; + + scan = index_beginscan(rel, iistate->ident_index, snapshot, + nkeys, 0); + *scan_p = scan; + index_rescan(scan, key, nkeys, NULL, 0); + + /* Info needed to retrieve key values from heap tuple. */ + ident_form = iistate->ident_index->rd_index; + ident_indkey = &ident_form->indkey; + + /* Use the incoming tuple to finalize the scan key. */ + for (int i = 0; i < scan->numberOfKeys; i++) + { + ScanKey entry; + bool isnull; + int16 attno_heap; + + entry = &scan->keyData[i]; + attno_heap = ident_indkey->values[i]; + entry->sk_argument = heap_getattr(tup_key, + attno_heap, + rel->rd_att, + &isnull); + Assert(!isnull); + } + if (index_getnext_slot(scan, ForwardScanDirection, ident_slot)) + { + bool shouldFree; + + result = ExecFetchSlotHeapTuple(ident_slot, false, &shouldFree); + /* TTSOpsBufferHeapTuple has .get_heap_tuple != NULL. */ + Assert(!shouldFree); + } + + return result; +} + +/* + * Decode and apply concurrent changes. + * + * Pass rel_src iff its reltoastrelid is needed. + */ +static void +process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal, + Relation rel_dst, Relation rel_src, ScanKey ident_key, + int ident_key_nentries, IndexInsertState *iistate) +{ + ClusterDecodingState *dstate; + + pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE, + PROGRESS_CLUSTER_PHASE_CATCH_UP); + + dstate = (ClusterDecodingState *) ctx->output_writer_private; + + cluster_decode_concurrent_changes(ctx, end_of_wal); + + if (dstate->nchanges == 0) + return; + + PG_TRY(); + { + /* + * Make sure that TOAST values can eventually be accessed via the old + * relation - see comment in copy_table_data(). + */ + if (rel_src) + rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid; + + apply_concurrent_changes(dstate, rel_dst, ident_key, + ident_key_nentries, iistate); + } + PG_FINALLY(); + { + ResetClusterCurrentXids(); + + if (rel_src) + rel_dst->rd_toastoid = InvalidOid; + } + PG_END_TRY(); +} + +static IndexInsertState * +get_index_insert_state(Relation relation, Oid ident_index_id) +{ + EState *estate; + int i; + IndexInsertState *result; + + result = (IndexInsertState *) palloc0(sizeof(IndexInsertState)); + estate = CreateExecutorState(); + result->econtext = GetPerTupleExprContext(estate); + + result->rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo)); + InitResultRelInfo(result->rri, relation, 0, 0, 0); + ExecOpenIndices(result->rri, false); + + /* + * Find the relcache entry of the identity index so that we spend no extra + * effort to open / close it. + */ + for (i = 0; i < result->rri->ri_NumIndices; i++) + { + Relation ind_rel; + + ind_rel = result->rri->ri_IndexRelationDescs[i]; + if (ind_rel->rd_id == ident_index_id) + result->ident_index = ind_rel; + } + if (result->ident_index == NULL) + elog(ERROR, "Failed to open identity index"); + + /* Only initialize fields needed by ExecInsertIndexTuples(). */ + result->estate = estate; + + return result; +} + +/* + * Build scan key to process logical changes. + */ +static ScanKey +build_identity_key(Oid ident_idx_oid, Relation rel_src, int *nentries) +{ + Relation ident_idx_rel; + Form_pg_index ident_idx; + int n, + i; + ScanKey result; + + Assert(OidIsValid(ident_idx_oid)); + ident_idx_rel = index_open(ident_idx_oid, AccessShareLock); + ident_idx = ident_idx_rel->rd_index; + n = ident_idx->indnatts; + result = (ScanKey) palloc(sizeof(ScanKeyData) * n); + for (i = 0; i < n; i++) + { + ScanKey entry; + int16 relattno; + Form_pg_attribute att; + Oid opfamily, + opcintype, + opno, + opcode; + + entry = &result[i]; + relattno = ident_idx->indkey.values[i]; + if (relattno >= 1) + { + TupleDesc desc; + + desc = rel_src->rd_att; + att = TupleDescAttr(desc, relattno - 1); + } + else + elog(ERROR, "Unexpected attribute number %d in index", relattno); + + opfamily = ident_idx_rel->rd_opfamily[i]; + opcintype = ident_idx_rel->rd_opcintype[i]; + opno = get_opfamily_member(opfamily, opcintype, opcintype, + BTEqualStrategyNumber); + + if (!OidIsValid(opno)) + elog(ERROR, "Failed to find = operator for type %u", opcintype); + + opcode = get_opcode(opno); + if (!OidIsValid(opcode)) + elog(ERROR, "Failed to find = operator for operator %u", opno); + + /* Initialize everything but argument. */ + ScanKeyInit(entry, + i + 1, + BTEqualStrategyNumber, opcode, + (Datum) NULL); + entry->sk_collation = att->attcollation; + } + index_close(ident_idx_rel, AccessShareLock); + + *nentries = n; + return result; +} + +static void +free_index_insert_state(IndexInsertState *iistate) +{ + ExecCloseIndices(iistate->rri); + FreeExecutorState(iistate->estate); + pfree(iistate->rri); + pfree(iistate); +} + +static void +cleanup_logical_decoding(LogicalDecodingContext *ctx) +{ + ClusterDecodingState *dstate; + + dstate = (ClusterDecodingState *) ctx->output_writer_private; + + ExecDropSingleTupleTableSlot(dstate->tsslot); + FreeTupleDesc(dstate->tupdesc_change); + FreeTupleDesc(dstate->tupdesc); + tuplestore_end(dstate->tstore); + + FreeDecodingContext(ctx); +} + +/* + * The final steps of rebuild_relation() for concurrent processing. + * + * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its + * clustering index (if one is passed) are still locked in a mode that allows + * concurrent data changes. On exit, both tables and their indexes are closed, + * but locked in AccessExclusiveLock mode. + */ +static void +rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, + Relation cl_index, + CatalogState *cat_state, + LogicalDecodingContext *ctx, + bool swap_toast_by_content, + TransactionId frozenXid, + MultiXactId cutoffMulti) +{ + LOCKMODE lmode_old; + List *ind_oids_new; + Oid old_table_oid = RelationGetRelid(OldHeap); + Oid new_table_oid = RelationGetRelid(NewHeap); + List *ind_oids_old = RelationGetIndexList(OldHeap); + ListCell *lc, *lc2; + char relpersistence; + bool is_system_catalog; + Oid ident_idx_old, ident_idx_new; + IndexInsertState *iistate; + ScanKey ident_key; + int ident_key_nentries; + XLogRecPtr wal_insert_ptr, end_of_wal; + char dummy_rec_data = '\0'; + RelReopenInfo *rri = NULL; + int nrel; + Relation *ind_refs_all, *ind_refs_p; + + /* Like in cluster_rel(). */ + lmode_old = LOCK_CLUSTER_CONCURRENT; + Assert(CheckRelationLockedByMe(OldHeap, lmode_old, false)); + Assert(cl_index == NULL || + CheckRelationLockedByMe(cl_index, lmode_old, false)); + /* This is expected from the caller. */ + Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false)); + + ident_idx_old = RelationGetReplicaIndex(OldHeap); + + /* + * Unlike the exclusive case, we build new indexes for the new relation + * rather than swapping the storage and reindexing the old relation. The + * point is that the index build can take some time, so we do it before we + * get AccessExclusiveLock on the old heap and therefore we cannot swap + * the heap storage yet. + * + * index_create() will lock the new indexes using AccessExclusiveLock + * creation - no need to change that. + */ + ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old); + + /* + * Processing shouldn't start w/o valid identity index. + */ + Assert(OidIsValid(ident_idx_old)); + + /* Find "identity index" on the new relation. */ + ident_idx_new = InvalidOid; + forboth(lc, ind_oids_old, lc2, ind_oids_new) + { + Oid ind_old = lfirst_oid(lc); + Oid ind_new = lfirst_oid(lc2); + + if (ident_idx_old == ind_old) + { + ident_idx_new = ind_new; + break; + } + } + if (!OidIsValid(ident_idx_new)) + /* + * Should not happen, given our lock on the old relation. + */ + ereport(ERROR, + (errmsg("Identity index missing on the new relation"))); + + /* Executor state to update indexes. */ + iistate = get_index_insert_state(NewHeap, ident_idx_new); + + /* + * Build scan key that we'll use to look for rows to be updated / deleted + * during logical decoding. + */ + ident_key = build_identity_key(ident_idx_new, OldHeap, &ident_key_nentries); + + /* + * Flush all WAL records inserted so far (possibly except for the last + * incomplete page, see GetInsertRecPtr), to minimize the amount of data + * we need to flush while holding exclusive lock on the source table. + */ + wal_insert_ptr = GetInsertRecPtr(); + XLogFlush(wal_insert_ptr); + end_of_wal = GetFlushRecPtr(NULL); + + /* + * Apply concurrent changes first time, to minimize the time we need to + * hold AccessExclusiveLock. (Quite some amount of WAL could have been + * written during the data copying and index creation.) + */ + process_concurrent_changes(ctx, end_of_wal, NewHeap, + swap_toast_by_content ? OldHeap : NULL, + ident_key, ident_key_nentries, iistate); + + /* + * Release the locks that allowed concurrent data changes, in order to + * acquire the AccessExclusiveLock. + */ + nrel = 0; + /* + * We unlock the old relation (and its clustering index), but then we will + * lock the relation and *all* its indexes because we want to swap their + * storage. + * + * (NewHeap is already locked, as well as its indexes.) + */ + rri = palloc_array(RelReopenInfo, 1 + list_length(ind_oids_old)); + init_rel_reopen_info(&rri[nrel++], &OldHeap, InvalidOid, + LOCK_CLUSTER_CONCURRENT, AccessExclusiveLock); + /* References to the re-opened indexes will be stored in this array. */ + ind_refs_all = palloc_array(Relation, list_length(ind_oids_old)); + ind_refs_p = ind_refs_all; + /* The clustering index is a special case. */ + if (cl_index) + { + *ind_refs_p = cl_index; + init_rel_reopen_info(&rri[nrel], ind_refs_p, InvalidOid, + LOCK_CLUSTER_CONCURRENT, AccessExclusiveLock); + nrel++; + ind_refs_p++; + } + /* + * Initialize also the entries for the other indexes (currently unlocked) + * because we will have to lock them. + */ + foreach(lc, ind_oids_old) + { + Oid ind_oid; + + ind_oid = lfirst_oid(lc); + /* Clustering index is already in the array, or there is none. */ + if (cl_index && RelationGetRelid(cl_index) == ind_oid) + continue; + + Assert(nrel < (1 + list_length(ind_oids_old))); + + *ind_refs_p = NULL; + init_rel_reopen_info(&rri[nrel], + /* + * In this special case we do not have the + * relcache reference, use OID instead. + */ + ind_refs_p, + ind_oid, + NoLock, /* Nothing to unlock. */ + AccessExclusiveLock); + + nrel++; + ind_refs_p++; + } + /* Perform the actual unlocking and re-locking. */ + unlock_and_close_relations(rri, nrel); + reopen_relations(rri, nrel); + + /* + * In addition, lock the OldHeap's TOAST relation that we skipped for the + * CONCURRENTLY option in copy_table_data(). This lock will be needed to + * swap the relation files. + */ + if (OidIsValid(OldHeap->rd_rel->reltoastrelid)) + LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock); + + /* + * Check if the new indexes match the old ones, i.e. no changes occurred + * while OldHeap was unlocked. + * + * XXX It's probably not necessary to check the relation tuple descriptor + * here because the logical decoding was already active when we released + * the lock, and thus the corresponding data changes won't be lost. + * However processing of those changes might take a lot of time. + */ + check_catalog_changes(OldHeap, cat_state); + + /* + * Tuples and pages of the old heap will be gone, but the heap will stay. + */ + TransferPredicateLocksToHeapRelation(OldHeap); + /* The same for indexes. */ + for (int i = 0; i < (nrel - 1); i++) + { + Relation index = ind_refs_all[i]; + + TransferPredicateLocksToHeapRelation(index); + + /* + * References to indexes on the old relation are not needed anymore, + * however locks stay till the end of the transaction. + */ + index_close(index, NoLock); + } + pfree(ind_refs_all); + + /* + * Flush anything we see in WAL, to make sure that all changes committed + * while we were waiting for the exclusive lock are available for + * decoding. This should not be necessary if all backends had + * synchronous_commit set, but we can't rely on this setting. + * + * Unfortunately, GetInsertRecPtr() may lag behind the actual insert + * position, and GetLastImportantRecPtr() points at the start of the last + * record rather than at the end. Thus the simplest way to determine the + * insert position is to insert a dummy record and use its LSN. + * + * XXX Consider using GetLastImportantRecPtr() and adding the size of the + * last record (plus the total size of all the page headers the record + * spans)? + */ + XLogBeginInsert(); + XLogRegisterData(&dummy_rec_data, 1); + wal_insert_ptr = XLogInsert(RM_XLOG_ID, XLOG_NOOP); + XLogFlush(wal_insert_ptr); + end_of_wal = GetFlushRecPtr(NULL); + + /* Apply the concurrent changes again. */ + process_concurrent_changes(ctx, end_of_wal, NewHeap, + swap_toast_by_content ? OldHeap : NULL, + ident_key, ident_key_nentries, iistate); + + /* Remember info about rel before closing OldHeap */ + relpersistence = OldHeap->rd_rel->relpersistence; + is_system_catalog = IsSystemRelation(OldHeap); + + pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE, + PROGRESS_CLUSTER_PHASE_SWAP_REL_FILES); + + forboth(lc, ind_oids_old, lc2, ind_oids_new) + { + Oid ind_old = lfirst_oid(lc); + Oid ind_new = lfirst_oid(lc2); + Oid mapped_tables[4]; + + /* Zero out possible results from swapped_relation_files */ + memset(mapped_tables, 0, sizeof(mapped_tables)); + + swap_relation_files(ind_old, ind_new, + (old_table_oid == RelationRelationId), + swap_toast_by_content, + true, + InvalidTransactionId, + InvalidMultiXactId, + mapped_tables); + +#ifdef USE_ASSERT_CHECKING + /* + * Concurrent processing is not supported for system relations, so + * there should be no mapped tables. + */ + for (int i = 0; i < 4; i++) + Assert(mapped_tables[i] == 0); +#endif + } + + /* The new indexes must be visible for deletion. */ + CommandCounterIncrement(); + + /* Close the old heap but keep lock until transaction commit. */ + table_close(OldHeap, NoLock); + /* Close the new heap. (We didn't have to open its indexes). */ + table_close(NewHeap, NoLock); + + /* Cleanup what we don't need anymore. (And close the identity index.) */ + pfree(ident_key); + free_index_insert_state(iistate); + + /* + * Swap the relations and their TOAST relations and TOAST indexes. This + * also drops the new relation and its indexes. + * + * (System catalogs are currently not supported.) + */ + Assert(!is_system_catalog); + finish_heap_swap(old_table_oid, new_table_oid, + is_system_catalog, + swap_toast_by_content, + false, true, false, + frozenXid, cutoffMulti, + relpersistence); + + pfree(rri); +} + +/* + * Build indexes on NewHeap according to those on OldHeap. + * + * OldIndexes is the list of index OIDs on OldHeap. + * + * A list of OIDs of the corresponding indexes created on NewHeap is + * returned. The order of items does match, so we can use these arrays to swap + * index storage. + */ +static List * +build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes) +{ + StringInfo ind_name; + ListCell *lc; + List *result = NIL; + + pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE, + PROGRESS_CLUSTER_PHASE_REBUILD_INDEX); + + ind_name = makeStringInfo(); + + foreach(lc, OldIndexes) + { + Oid ind_oid, + ind_oid_new, + tbsp_oid; + Relation ind; + IndexInfo *ind_info; + int i, + heap_col_id; + List *colnames; + int16 indnatts; + Oid *collations, + *opclasses; + HeapTuple tup; + bool isnull; + Datum d; + oidvector *oidvec; + int2vector *int2vec; + size_t oid_arr_size; + size_t int2_arr_size; + int16 *indoptions; + text *reloptions = NULL; + bits16 flags; + Datum *opclassOptions; + NullableDatum *stattargets; + + ind_oid = lfirst_oid(lc); + ind = index_open(ind_oid, AccessShareLock); + ind_info = BuildIndexInfo(ind); + + tbsp_oid = ind->rd_rel->reltablespace; + /* + * Index name really doesn't matter, we'll eventually use only their + * storage. Just make them unique within the table. + */ + resetStringInfo(ind_name); + appendStringInfo(ind_name, "ind_%d", + list_cell_number(OldIndexes, lc)); + + flags = 0; + if (ind->rd_index->indisprimary) + flags |= INDEX_CREATE_IS_PRIMARY; + + colnames = NIL; + indnatts = ind->rd_index->indnatts; + oid_arr_size = sizeof(Oid) * indnatts; + int2_arr_size = sizeof(int16) * indnatts; + + collations = (Oid *) palloc(oid_arr_size); + for (i = 0; i < indnatts; i++) + { + char *colname; + + heap_col_id = ind->rd_index->indkey.values[i]; + if (heap_col_id > 0) + { + Form_pg_attribute att; + + /* Normal attribute. */ + att = TupleDescAttr(OldHeap->rd_att, heap_col_id - 1); + colname = pstrdup(NameStr(att->attname)); + collations[i] = att->attcollation; + } + else if (heap_col_id == 0) + { + HeapTuple tuple; + Form_pg_attribute att; + + /* + * Expression column is not present in relcache. What we need + * here is an attribute of the *index* relation. + */ + tuple = SearchSysCache2(ATTNUM, + ObjectIdGetDatum(ind_oid), + Int16GetDatum(i + 1)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, + "cache lookup failed for attribute %d of relation %u", + i + 1, ind_oid); + att = (Form_pg_attribute) GETSTRUCT(tuple); + colname = pstrdup(NameStr(att->attname)); + collations[i] = att->attcollation; + ReleaseSysCache(tuple); + } + else + elog(ERROR, "Unexpected column number: %d", + heap_col_id); + + colnames = lappend(colnames, colname); + } + + /* + * Special effort needed for variable length attributes of + * Form_pg_index. + */ + tup = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(ind_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for index %u", ind_oid); + d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indclass, &isnull); + Assert(!isnull); + oidvec = (oidvector *) DatumGetPointer(d); + opclasses = (Oid *) palloc(oid_arr_size); + memcpy(opclasses, oidvec->values, oid_arr_size); + + d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indoption, + &isnull); + Assert(!isnull); + int2vec = (int2vector *) DatumGetPointer(d); + indoptions = (int16 *) palloc(int2_arr_size); + memcpy(indoptions, int2vec->values, int2_arr_size); + ReleaseSysCache(tup); + + tup = SearchSysCache1(RELOID, ObjectIdGetDatum(ind_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for index relation %u", ind_oid); + d = SysCacheGetAttr(RELOID, tup, Anum_pg_class_reloptions, &isnull); + reloptions = !isnull ? DatumGetTextPCopy(d) : NULL; + ReleaseSysCache(tup); + + opclassOptions = palloc0(sizeof(Datum) * ind_info->ii_NumIndexAttrs); + for (i = 0; i < ind_info->ii_NumIndexAttrs; i++) + opclassOptions[i] = get_attoptions(ind_oid, i + 1); + + stattargets = get_index_stattargets(ind_oid, ind_info); + + /* + * Neither parentIndexRelid nor parentConstraintId needs to be passed + * since the new catalog entries (pg_constraint, pg_inherits) would + * eventually be dropped. Therefore there's no need to record valid + * dependency on parents. + */ + ind_oid_new = index_create(NewHeap, + ind_name->data, + InvalidOid, + InvalidOid, /* parentIndexRelid */ + InvalidOid, /* parentConstraintId */ + InvalidOid, + ind_info, + colnames, + ind->rd_rel->relam, + tbsp_oid, + collations, + opclasses, + opclassOptions, + indoptions, + stattargets, + PointerGetDatum(reloptions), + flags, /* flags */ + 0, /* constr_flags */ + false, /* allow_system_table_mods */ + false, /* is_internal */ + NULL /* constraintId */ + ); + result = lappend_oid(result, ind_oid_new); + + index_close(ind, AccessShareLock); + list_free_deep(colnames); + pfree(collations); + pfree(opclasses); + pfree(indoptions); + if (reloptions) + pfree(reloptions); + } + + return result; +} + +static void +init_rel_reopen_info(RelReopenInfo *rri, Relation *rel_p, Oid relid, + LOCKMODE lockmode_orig, LOCKMODE lockmode_new) +{ + rri->rel_p = rel_p; + rri->relid = relid; + rri->lockmode_orig = lockmode_orig; + rri->lockmode_new = lockmode_new; +} + +/* + * Unlock and close relations specified by items of the 'rels' array. 'nrels' + * is the number of items. + * + * Information needed to (re)open the relations (or to issue meaningful ERROR) + * is added to the array items. + */ +static void +unlock_and_close_relations(RelReopenInfo *rels, int nrel) +{ + int i; + RelReopenInfo *rri; + + /* + * First, retrieve the information that we will need for re-opening. + * + * We could close (and unlock) each relation as soon as we have gathered + * the related information, but then we would have to be careful not to + * unlock the table until we have the info on all its indexes. (Once we + * unlock the table, any index can be dropped, and thus we can fail to get + * the name we want to report if re-opening fails.) It seem simpler to + * separate the work into two iterations. + */ + for (i = 0; i < nrel; i++) + { + Relation rel; + + rri = &rels[i]; + rel = *rri->rel_p; + + if (rel) + { + Assert(CheckRelationLockedByMe(rel, rri->lockmode_orig, false)); + Assert(!OidIsValid(rri->relid)); + + rri->relid = RelationGetRelid(rel); + rri->relkind = rel->rd_rel->relkind; + rri->relname = pstrdup(RelationGetRelationName(rel)); + } + else + { + Assert(OidIsValid(rri->relid)); + + rri->relname = get_rel_name(rri->relid); + rri->relkind = get_rel_relkind(rri->relid); + } + } + + /* Second, close the relations. */ + for (i = 0; i < nrel; i++) + { + Relation rel; + + rri = &rels[i]; + rel = *rri->rel_p; + + /* Close the relation if the caller passed one. */ + if (rel) + { + if (rri->relkind == RELKIND_RELATION) + table_close(rel, rri->lockmode_orig); + else + { + Assert(rri->relkind == RELKIND_INDEX); + + index_close(rel, rri->lockmode_orig); + } + } + } +} + +/* + * Re-open the relations closed previously by unlock_and_close_relations(). + */ +static void +reopen_relations(RelReopenInfo *rels, int nrel) +{ + for (int i = 0; i < nrel; i++) + { + RelReopenInfo *rri = &rels[i]; + Relation rel; + + if (rri->relkind == RELKIND_RELATION) + { + rel = try_table_open(rri->relid, rri->lockmode_new); + } + else + { + Assert(rri->relkind == RELKIND_INDEX); + + rel = try_index_open(rri->relid, rri->lockmode_new); + } + + if (rel == NULL) + { + const char *kind_str; + + kind_str = (rri->relkind == RELKIND_RELATION) ? "table" : "index"; + ereport(ERROR, + (errmsg("could not open \%s \"%s\"", kind_str, + rri->relname), + errhint("the %s could have been dropped by another transaction", + kind_str))); + } + *rri->rel_p = rel; + + pfree(rri->relname); + } +} diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 488ca950d9..af1945e1ed 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -873,7 +873,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) { - finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, true, RecentXmin, ReadNextMultiXactId(), relpersistence); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 5d6151dad1..13f32ede92 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -4395,6 +4395,16 @@ AlterTableInternal(Oid relid, List *cmds, bool recurse) rel = relation_open(relid, lockmode); + /* + * If lockmode allows, check if VACUUM FULL / CLUSTER CONCURRENTLY is in + * progress. If lockmode is too weak, cluster_rel() should detect + * incompatible DDLs executed by us. + * + * XXX We might skip the changes for DDLs which do not change the tuple + * descriptor. + */ + check_for_concurrent_cluster(relid, lockmode); + EventTriggerAlterTableRelid(relid); ATController(NULL, rel, cmds, recurse, lockmode, NULL); @@ -5861,6 +5871,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode, finish_heap_swap(tab->relid, OIDNewHeap, false, false, true, !OidIsValid(tab->newTableSpace), + true, RecentXmin, ReadNextMultiXactId(), persistence); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 0bd000acc5..529c46c186 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -112,7 +112,8 @@ static void vac_truncate_clog(TransactionId frozenXID, TransactionId lastSaneFrozenXid, MultiXactId lastSaneMinMulti); static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, - BufferAccessStrategy bstrategy); + BufferAccessStrategy bstrategy, bool isTopLevel, + bool whole_database); static double compute_parallel_delay(void); static VacOptValue get_vacoptval_from_boolean(DefElem *def); static bool vac_tid_reaped(ItemPointer itemptr, void *state); @@ -153,6 +154,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) bool analyze = false; bool freeze = false; bool full = false; + bool concurrent = false; bool disable_page_skipping = false; bool process_main = true; bool process_toast = true; @@ -226,6 +228,8 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) freeze = defGetBoolean(opt); else if (strcmp(opt->defname, "full") == 0) full = defGetBoolean(opt); + else if (strcmp(opt->defname, "concurrently") == 0) + concurrent = defGetBoolean(opt); else if (strcmp(opt->defname, "disable_page_skipping") == 0) disable_page_skipping = defGetBoolean(opt); else if (strcmp(opt->defname, "index_cleanup") == 0) @@ -300,7 +304,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) (skip_locked ? VACOPT_SKIP_LOCKED : 0) | (analyze ? VACOPT_ANALYZE : 0) | (freeze ? VACOPT_FREEZE : 0) | - (full ? VACOPT_FULL : 0) | + (full ? (concurrent ? VACOPT_FULL_CONCURRENT : VACOPT_FULL_EXCLUSIVE) : 0) | (disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0) | (process_main ? VACOPT_PROCESS_MAIN : 0) | (process_toast ? VACOPT_PROCESS_TOAST : 0) | @@ -380,6 +384,12 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) errmsg("ONLY_DATABASE_STATS cannot be specified with other VACUUM options"))); } + /* This problem cannot be identified from the options. */ + if (concurrent && !full) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CONCURRENTLY can only be specified with VACUUM FULL"))); + /* * All freeze ages are zero if the FREEZE option is given; otherwise pass * them as -1 which means to use the default values. @@ -483,6 +493,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, const char *stmttype; volatile bool in_outer_xact, use_own_xacts; + bool whole_database = false; Assert(params != NULL); @@ -543,7 +554,15 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, relations = newrels; } else + { relations = get_all_vacuum_rels(vac_context, params->options); + /* + * If all tables should be processed, the CONCURRENTLY option implies + * that we should skip system relations rather than raising ERRORs. + */ + if (params->options & VACOPT_FULL_CONCURRENT) + whole_database = true; + } /* * Decide whether we need to start/commit our own transactions. @@ -619,7 +638,8 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, if (params->options & VACOPT_VACUUM) { - if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy)) + if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy, + isTopLevel, whole_database)) continue; } @@ -1932,10 +1952,14 @@ vac_truncate_clog(TransactionId frozenXID, /* * vacuum_rel() -- vacuum one heap relation * - * relid identifies the relation to vacuum. If relation is supplied, - * use the name therein for reporting any failure to open/lock the rel; - * do not use it once we've successfully opened the rel, since it might - * be stale. + * relid identifies the relation to vacuum. If relation is supplied, use + * the name therein for reporting any failure to open/lock the rel; do + * not use it once we've successfully opened the rel, since it might be + * stale. + * + * If whole_database is true, we are processing all the relations of the + * current database. In that case we might need to silently skip + * relations which could otherwise cause ERROR. * * Returns true if it's okay to proceed with a requested ANALYZE * operation on this table. @@ -1950,7 +1974,8 @@ vac_truncate_clog(TransactionId frozenXID, */ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, - BufferAccessStrategy bstrategy) + BufferAccessStrategy bstrategy, bool isTopLevel, + bool whole_database) { LOCKMODE lmode; Relation rel; @@ -2013,10 +2038,11 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, /* * Determine the type of lock we want --- hard exclusive lock for a FULL - * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either - * way, we can be sure that no other backend is vacuuming the same table. + * exclusive vacuum, but a weaker lock (ShareUpdateExclusiveLock) for + * concurrent vacuum. Either way, we can be sure that no other backend is + * vacuuming the same table. */ - lmode = (params->options & VACOPT_FULL) ? + lmode = (params->options & VACOPT_FULL_EXCLUSIVE) ? AccessExclusiveLock : ShareUpdateExclusiveLock; /* open the relation and get the appropriate lock on it */ @@ -2031,6 +2057,39 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, return false; } + /* + * Leave if the CONCURRENTLY option was passed, but the relation is not + * suitable for that. Note that we only skip such relations if the user + * wants to vacuum the whole database. In contrast, if he specified + * inappropriate relation(s) explicitly, the command will end up with + * ERROR. + */ + if (whole_database && (params->options & VACOPT_FULL_CONCURRENT) && + !check_relation_is_clusterable_concurrently(rel, DEBUG1, + "VACUUM (FULL, CONCURRENTLY)")) + { + relation_close(rel, lmode); + PopActiveSnapshot(); + CommitTransactionCommand(); + return false; + } + + /* + * Skip the relation if VACUUM FULL / CLUSTER CONCURRENTLY is in progress + * as it will drop the current storage of the relation. + * + * This check should not take place until we have a lock that prevents + * another backend from starting VACUUM FULL / CLUSTER CONCURRENTLY later. + */ + Assert(lmode >= LOCK_CLUSTER_CONCURRENT); + if (is_concurrent_cluster_in_progress(relid)) + { + relation_close(rel, lmode); + PopActiveSnapshot(); + CommitTransactionCommand(); + return false; + } + /* * When recursing to a TOAST table, check privileges on the parent. NB: * This is only safe to do because we hold a session lock on the main @@ -2104,19 +2163,6 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, return true; } - /* - * Get a session-level lock too. This will protect our access to the - * relation across multiple transactions, so that we can vacuum the - * relation's TOAST table (if any) secure in the knowledge that no one is - * deleting the parent relation. - * - * NOTE: this cannot block, even if someone else is waiting for access, - * because the lock manager knows that both lock requests are from the - * same process. - */ - lockrelid = rel->rd_lockInfo.lockRelId; - LockRelationIdForSession(&lockrelid, lmode); - /* * Set index_cleanup option based on index_cleanup reloption if it wasn't * specified in VACUUM command, or when running in an autovacuum worker @@ -2169,6 +2215,30 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, else toast_relid = InvalidOid; + /* + * Get a session-level lock too. This will protect our access to the + * relation across multiple transactions, so that we can vacuum the + * relation's TOAST table (if any) secure in the knowledge that no one is + * deleting the parent relation. + * + * NOTE: this cannot block, even if someone else is waiting for access, + * because the lock manager knows that both lock requests are from the + * same process. + */ + if (OidIsValid(toast_relid)) + { + /* + * You might worry that, in the VACUUM (FULL, CONCURRENTLY) case, + * cluster_rel() needs to release all the locks on the relation at + * some point, but this session lock makes it impossible. In fact, + * cluster_rel() will will eventually be called for the TOAST relation + * and raise ERROR because, in the concurrent mode, it cannot process + * TOAST relation alone anyway. + */ + lockrelid = rel->rd_lockInfo.lockRelId; + LockRelationIdForSession(&lockrelid, lmode); + } + /* * Switch to the table owner's userid, so that any index functions are run * as that user. Also lock down security-restricted operations and @@ -2196,11 +2266,22 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, { ClusterParams cluster_params = {0}; + /* + * Invalid toast_relid means that there is no session lock on the + * relation. Such a lock would be a problem because it would + * prevent cluster_rel() from releasing all locks when it tries to + * get AccessExclusiveLock. + */ + Assert(!OidIsValid(toast_relid)); + if ((params->options & VACOPT_VERBOSE) != 0) cluster_params.options |= CLUOPT_VERBOSE; + if ((params->options & VACOPT_FULL_CONCURRENT) != 0) + cluster_params.options |= CLUOPT_CONCURRENT; + /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */ - cluster_rel(rel, InvalidOid, &cluster_params); + cluster_rel(rel, InvalidOid, &cluster_params, isTopLevel); /* * cluster_rel() should have closed the relation, lock is kept @@ -2249,13 +2330,15 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, toast_vacuum_params.options |= VACOPT_PROCESS_MAIN; toast_vacuum_params.toast_parent = relid; - vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy); + vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy, + isTopLevel, whole_database); } /* * Now release the session-level lock on the main table. */ - UnlockRelationIdForSession(&lockrelid, lmode); + if (OidIsValid(toast_relid)) + UnlockRelationIdForSession(&lockrelid, lmode); /* Report that we really did it. */ return true; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index d687ceee33..066d96dea2 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -467,6 +467,57 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + /* + * If the change is not intended for logical decoding, do not even + * establish transaction for it. This is particularly important if the + * record was generated by CLUSTER CONCURRENTLY because this command uses + * the original XID when doing changes in the new storage. The decoding + * subsystem probably does not expect to see the same transaction multiple + * times. + */ + switch (info) + { + case XLOG_HEAP_INSERT: + { + xl_heap_insert *rec; + + rec = (xl_heap_insert *) XLogRecGetData(buf->record); + /* + * (This does happen when raw_heap_insert marks the TOAST record + * as HEAP_INSERT_NO_LOGICAL). + */ + if ((rec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) == 0) + return; + + break; + } + + case XLOG_HEAP_HOT_UPDATE: + case XLOG_HEAP_UPDATE: + { + xl_heap_update *rec; + + rec = (xl_heap_update *) XLogRecGetData(buf->record); + if ((rec->flags & + (XLH_UPDATE_CONTAINS_NEW_TUPLE | + XLH_UPDATE_CONTAINS_OLD_TUPLE | + XLH_UPDATE_CONTAINS_OLD_KEY)) == 0) + return; + + break; + } + + case XLOG_HEAP_DELETE: + { + xl_heap_delete *rec; + + rec = (xl_heap_delete *) XLogRecGetData(buf->record); + if (rec->flags & XLH_DELETE_NO_LOGICAL) + return; + break; + } + } + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* @@ -903,13 +954,6 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xlrec = (xl_heap_insert *) XLogRecGetData(r); - /* - * Ignore insert records without new tuples (this does happen when - * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). - */ - if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)) - return; - /* only interested in our database */ XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); if (target_locator.dbOid != ctx->slot->data.database) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e37e22f441..ed15a0b175 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -286,7 +286,7 @@ static bool ExportInProgress = false; static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ -static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); +static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn); static void SnapBuildFreeSnapshot(Snapshot snap); @@ -481,12 +481,17 @@ SnapBuildSnapDecRefcount(Snapshot snap) * Build a new snapshot, based on currently committed catalog-modifying * transactions. * + * 'lsn' is the location of the commit record (of a catalog-changing + * transaction) that triggered creation of the snapshot. Pass + * InvalidXLogRecPtr for the transaction base snapshot or if it the user of + * the snapshot should not need the LSN. + * * In-progress transactions with catalog access are *not* allowed to modify * these snapshots; they have to copy them and fill in appropriate ->curcid * and ->subxip/subxcnt values. */ static Snapshot -SnapBuildBuildSnapshot(SnapBuild *builder) +SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn) { Snapshot snapshot; Size ssize; @@ -554,6 +559,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder) snapshot->active_count = 0; snapshot->regd_count = 0; snapshot->snapXactCompletionCount = 0; + snapshot->lsn = lsn; return snapshot; } @@ -569,10 +575,7 @@ Snapshot SnapBuildInitialSnapshot(SnapBuild *builder) { Snapshot snap; - TransactionId xid; TransactionId safeXid; - TransactionId *newxip; - int newxcnt = 0; Assert(XactIsoLevel == XACT_REPEATABLE_READ); Assert(builder->building_full_snapshot); @@ -593,7 +596,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) if (TransactionIdIsValid(MyProc->xmin)) elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid"); - snap = SnapBuildBuildSnapshot(builder); + snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr); /* * We know that snap->xmin is alive, enforced by the logical xmin @@ -614,6 +617,47 @@ SnapBuildInitialSnapshot(SnapBuild *builder) MyProc->xmin = snap->xmin; + /* Convert the historic snapshot to MVCC snapshot. */ + return SnapBuildMVCCFromHistoric(snap, true); +} + +/* + * Build an MVCC snapshot for the initial data load performed by CLUSTER + * CONCURRENTLY command. + * + * The snapshot will only be used to scan one particular relation, which is + * treated like a catalog (therefore ->building_full_snapshot is not + * important), and the caller should already have a replication slot setup (so + * we do not set MyProc->xmin). XXX Do we yet need to add some restrictions? + */ +Snapshot +SnapBuildInitialSnapshotForCluster(SnapBuild *builder) +{ + Snapshot snap; + + Assert(builder->state == SNAPBUILD_CONSISTENT); + + snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr); + return SnapBuildMVCCFromHistoric(snap, false); +} + +/* + * Turn a historic MVCC snapshot into an ordinary MVCC snapshot. + * + * Pass true for 'in_place' if you don't care about modifying the source + * snapshot. If you need a new instance, and one that was allocated as a + * single chunk of memory, pass false. + */ +Snapshot +SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place) +{ + TransactionId xid; + TransactionId *oldxip = snapshot->xip; + uint32 oldxcnt = snapshot->xcnt; + TransactionId *newxip; + int newxcnt = 0; + Snapshot result; + /* allocate in transaction context */ newxip = (TransactionId *) palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount()); @@ -624,7 +668,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) * classical snapshot by marking all non-committed transactions as * in-progress. This can be expensive. */ - for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);) + for (xid = snapshot->xmin; NormalTransactionIdPrecedes(xid, snapshot->xmax);) { void *test; @@ -632,7 +676,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) * Check whether transaction committed using the decoding snapshot * meaning of ->xip. */ - test = bsearch(&xid, snap->xip, snap->xcnt, + test = bsearch(&xid, snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator); if (test == NULL) @@ -649,11 +693,22 @@ SnapBuildInitialSnapshot(SnapBuild *builder) } /* adjust remaining snapshot fields as needed */ - snap->snapshot_type = SNAPSHOT_MVCC; - snap->xcnt = newxcnt; - snap->xip = newxip; + snapshot->xcnt = newxcnt; + snapshot->xip = newxip; + + if (in_place) + result = snapshot; + else + { + result = CopySnapshot(snapshot); + + /* Restore the original values so the source is intact. */ + snapshot->xip = oldxip; + snapshot->xcnt = oldxcnt; + } + result->snapshot_type = SNAPSHOT_MVCC; - return snap; + return result; } /* @@ -712,7 +767,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder) /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { - builder->snapshot = SnapBuildBuildSnapshot(builder); + builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } @@ -792,7 +847,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { - builder->snapshot = SnapBuildBuildSnapshot(builder); + builder->snapshot = SnapBuildBuildSnapshot(builder, lsn); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } @@ -1161,7 +1216,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, if (builder->snapshot) SnapBuildSnapDecRefcount(builder->snapshot); - builder->snapshot = SnapBuildBuildSnapshot(builder); + builder->snapshot = SnapBuildBuildSnapshot(builder, lsn); /* we might need to execute invalidations, add snapshot */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) @@ -1989,7 +2044,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) { SnapBuildSnapDecRefcount(builder->snapshot); } - builder->snapshot = SnapBuildBuildSnapshot(builder); + builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr); SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferSetRestartPoint(builder->reorder, lsn); diff --git a/src/backend/replication/pgoutput_cluster/Makefile b/src/backend/replication/pgoutput_cluster/Makefile new file mode 100644 index 0000000000..31471bb546 --- /dev/null +++ b/src/backend/replication/pgoutput_cluster/Makefile @@ -0,0 +1,32 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/pgoutput_cluster +# +# IDENTIFICATION +# src/backend/replication/pgoutput_cluster +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/pgoutput_cluster +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + $(WIN32RES) \ + pgoutput_cluster.o +PGFILEDESC = "pgoutput_cluster - logical replication output plugin for CLUSTER command" +NAME = pgoutput_cluster + +all: all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/pgoutput_cluster/meson.build b/src/backend/replication/pgoutput_cluster/meson.build new file mode 100644 index 0000000000..0f033064f2 --- /dev/null +++ b/src/backend/replication/pgoutput_cluster/meson.build @@ -0,0 +1,18 @@ +# Copyright (c) 2022-2024, PostgreSQL Global Development Group + +pgoutput_cluster_sources = files( + 'pgoutput_cluster.c', +) + +if host_system == 'windows' + pgoutput_cluster_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pgoutput_cluster', + '--FILEDESC', 'pgoutput_cluster - logical replication output plugin for CLUSTER command',]) +endif + +pgoutput_cluster = shared_module('pgoutput_cluster', + pgoutput_cluster_sources, + kwargs: pg_mod_args, +) + +backend_targets += pgoutput_cluster diff --git a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c new file mode 100644 index 0000000000..9fe44017a8 --- /dev/null +++ b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c @@ -0,0 +1,321 @@ +/* TODO Move into src/backend/cluster/ (and rename?) */ +/*------------------------------------------------------------------------- + * + * pgoutput_cluster.c + * Logical Replication output plugin for CLUSTER command + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/pgoutput_cluster/pgoutput_cluster.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heaptoast.h" +#include "commands/cluster.h" +#include "replication/snapbuild.h" + +PG_MODULE_MAGIC; + +static void plugin_startup(LogicalDecodingContext *ctx, + OutputPluginOptions *opt, bool is_init); +static void plugin_shutdown(LogicalDecodingContext *ctx); +static void plugin_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void plugin_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation rel, ReorderBufferChange *change); +static void plugin_truncate(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, + Relation relations[], + ReorderBufferChange *change); +static void store_change(LogicalDecodingContext *ctx, + ConcurrentChangeKind kind, HeapTuple tuple, + TransactionId xid); + +void +_PG_output_plugin_init(OutputPluginCallbacks *cb) +{ + AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit); + + cb->startup_cb = plugin_startup; + cb->begin_cb = plugin_begin_txn; + cb->change_cb = plugin_change; + cb->truncate_cb = plugin_truncate; + cb->commit_cb = plugin_commit_txn; + cb->shutdown_cb = plugin_shutdown; +} + + +/* initialize this plugin */ +static void +plugin_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, + bool is_init) +{ + ctx->output_plugin_private = NULL; + + /* Probably unnecessary, as we don't use the SQL interface ... */ + opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + + if (ctx->output_plugin_options != NIL) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("This plugin does not expect any options"))); + } +} + +static void +plugin_shutdown(LogicalDecodingContext *ctx) +{ +} + +/* + * As we don't release the slot during processing of particular table, there's + * no room for SQL interface, even for debugging purposes. Therefore we need + * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin + * callbacks. (Although we might want to write custom callbacks, this API + * seems to be unnecessarily generic for our purposes.) + */ + +/* BEGIN callback */ +static void +plugin_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ +} + +/* COMMIT callback */ +static void +plugin_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ +} + +/* + * Callback for individual changed tuples + */ +static void +plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + ClusterDecodingState *dstate; + Snapshot snapshot; + + dstate = (ClusterDecodingState *) ctx->output_writer_private; + + /* Only interested in one particular relation. */ + if (relation->rd_id != dstate->relid) + return; + + /* + * Catalog snapshot is fine because the table we are processing is + * temporarily considered a user catalog table. + */ + snapshot = GetCatalogSnapshot(InvalidOid); + Assert(snapshot->snapshot_type == SNAPSHOT_HISTORIC_MVCC); + Assert(!snapshot->suboverflowed); + + /* + * This should not happen, but if we don't have enough information to + * apply a new snapshot, the consequences would be bad. Thus prefer ERROR + * to Assert(). + */ + if (XLogRecPtrIsInvalid(snapshot->lsn)) + ereport(ERROR, (errmsg("snapshot has invalid LSN"))); + + /* + * reorderbuffer.c changes the catalog snapshot as soon as it sees a new + * CID or a commit record of a catalog-changing transaction. + */ + if (dstate->snapshot == NULL || snapshot->lsn != dstate->snapshot_lsn || + snapshot->curcid != dstate->snapshot->curcid) + { + /* CID should not go backwards. */ + Assert(dstate->snapshot == NULL || + snapshot->curcid >= dstate->snapshot->curcid); + + /* + * XXX Is it a problem that the copy is created in + * TopTransactionContext? + */ + dstate->snapshot = SnapBuildMVCCFromHistoric(snapshot, false); + dstate->snapshot_lsn = snapshot->lsn; + } + + /* Decode entry depending on its type */ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple newtuple; + + newtuple = change->data.tp.newtuple != NULL ? + change->data.tp.newtuple : NULL; + + /* + * Identity checks in the main function should have made this + * impossible. + */ + if (newtuple == NULL) + elog(ERROR, "Incomplete insert info."); + + store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid); + } + break; + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple oldtuple, + newtuple; + + oldtuple = change->data.tp.oldtuple != NULL ? + change->data.tp.oldtuple : NULL; + newtuple = change->data.tp.newtuple != NULL ? + change->data.tp.newtuple : NULL; + + if (newtuple == NULL) + elog(ERROR, "Incomplete update info."); + + if (oldtuple != NULL) + store_change(ctx, CHANGE_UPDATE_OLD, oldtuple, + change->txn->xid); + + store_change(ctx, CHANGE_UPDATE_NEW, newtuple, + change->txn->xid); + } + break; + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple oldtuple; + + oldtuple = change->data.tp.oldtuple ? + change->data.tp.oldtuple : NULL; + + if (oldtuple == NULL) + elog(ERROR, "Incomplete delete info."); + + store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid); + } + break; + default: + /* Should not come here */ + Assert(false); + break; + } +} + +static void +plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + ClusterDecodingState *dstate; + int i; + Relation relation = NULL; + + dstate = (ClusterDecodingState *) ctx->output_writer_private; + + /* Find the relation we are processing. */ + for (i = 0; i < nrelations; i++) + { + relation = relations[i]; + + if (RelationGetRelid(relation) == dstate->relid) + break; + } + + /* Is this truncation of another relation? */ + if (i == nrelations) + return; + + store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId); +} + +/* Store concurrent data change. */ +static void +store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, + HeapTuple tuple, TransactionId xid) +{ + ClusterDecodingState *dstate; + char *change_raw; + ConcurrentChange *change; + bool flattened = false; + Size size; + Datum values[1]; + bool isnull[1]; + char *dst; + + dstate = (ClusterDecodingState *) ctx->output_writer_private; + + size = MAXALIGN(VARHDRSZ) + sizeof(ConcurrentChange); + + if (tuple) + { + /* + * ReorderBufferCommit() stores the TOAST chunks in its private memory + * context and frees them after having called + * apply_change(). Therefore we need flat copy (including TOAST) that + * we eventually copy into the memory context which is available to + * decode_concurrent_changes(). + */ + if (HeapTupleHasExternal(tuple)) + { + /* + * toast_flatten_tuple_to_datum() might be more convenient but we + * don't want the decompression it does. + */ + tuple = toast_flatten_tuple(tuple, dstate->tupdesc); + flattened = true; + } + + size += tuple->t_len; + } + + /* XXX Isn't there any function / macro to do this? */ + if (size >= 0x3FFFFFFF) + elog(ERROR, "Change is too big."); + + /* Construct the change. */ + change_raw = (char *) palloc0(size); + SET_VARSIZE(change_raw, size); + change = (ConcurrentChange *) VARDATA(change_raw); + change->kind = kind; + + /* No other information is needed for TRUNCATE. */ + if (change->kind == CHANGE_TRUNCATE) + goto store; + + /* + * Copy the tuple. + * + * CAUTION: change->tup_data.t_data must be fixed on retrieval! + */ + memcpy(&change->tup_data, tuple, sizeof(HeapTupleData)); + dst = (char *) change + sizeof(ConcurrentChange); + memcpy(dst, tuple->t_data, tuple->t_len); + + /* Initialize the other fields. */ + change->xid = xid; + change->snapshot = dstate->snapshot; + dstate->snapshot->active_count++; + + /* The data has been copied. */ + if (flattened) + pfree(tuple); + +store: + /* Store as tuple of 1 bytea column. */ + values[0] = PointerGetDatum(change_raw); + isnull[0] = false; + tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change, + values, isnull); + + /* Accounting. */ + dstate->nchanges++; + + /* Cleanup. */ + pfree(change_raw); +} + diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2100150f01..a84de0611a 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -25,6 +25,7 @@ #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "commands/async.h" +#include "commands/cluster.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, WaitEventCustomShmemSize()); size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); + size = add_size(size, ClusterShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -357,6 +359,7 @@ CreateOrAttachShmemStructs(void) StatsShmemInit(); WaitEventCustomShmemInit(); InjectionPointShmemInit(); + ClusterShmemInit(); } /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index fa66b8017e..a6dda9b520 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1299,6 +1299,17 @@ ProcessUtilitySlow(ParseState *pstate, lockmode = AlterTableGetLockLevel(atstmt->cmds); relid = AlterTableLookupRelation(atstmt, lockmode); + /* + * If lockmode allows, check if VACUUM FULL / CLUSTER + * CONCURRENT is in progress. If lockmode is too weak, + * cluster_rel() should detect incompatible DDLs executed + * by us. + * + * XXX We might skip the changes for DDLs which do not + * change the tuple descriptor. + */ + check_for_concurrent_cluster(relid, lockmode); + if (OidIsValid(relid)) { AlterTableUtilityContext atcontext; diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c index e7c8bfba94..c52ec92a97 100644 --- a/src/backend/utils/activity/backend_progress.c +++ b/src/backend/utils/activity/backend_progress.c @@ -163,3 +163,19 @@ pgstat_progress_end_command(void) beentry->st_progress.command_target = InvalidOid; PGSTAT_END_WRITE_ACTIVITY(beentry); } + +void +pgstat_progress_restore_state(PgBackendProgress *backup) +{ + volatile PgBackendStatus *beentry = MyBEEntry; + + if (!beentry || !pgstat_track_activities) + return; + + PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); + beentry->st_progress.command = backup->command; + beentry->st_progress.command_target = backup->command_target; + memcpy(MyBEEntry->st_progress.param, backup->param, + sizeof(beentry->st_progress.param)); + PGSTAT_END_WRITE_ACTIVITY(beentry); +} diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index db37beeaae..8245be7846 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -345,6 +345,7 @@ WALSummarizer "Waiting to read or update WAL summarization state." DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state." +ClusteredRels "Waiting to read or update information on tables being clustered concurrently." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 603aa4157b..5a2d5d6138 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -1373,6 +1373,28 @@ CacheInvalidateRelcache(Relation relation) RegisterRelcacheInvalidation(databaseId, relationId); } +/* + * CacheInvalidateRelcacheImmediate + * Send invalidation message for the specified relation's relcache entry. + * + * Currently this is used in VACUUM FULL/CLUSTER CONCURRENTLY, to make sure + * that other backends are aware that the command is being executed for the + * relation. + */ +void +CacheInvalidateRelcacheImmediate(Relation relation) +{ + SharedInvalidationMessage msg; + + msg.rc.id = SHAREDINVALRELCACHE_ID; + msg.rc.dbId = MyDatabaseId; + msg.rc.relId = RelationGetRelid(relation); + /* check AddCatcacheInvalidationMessage() for an explanation */ + VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); + + SendSharedInvalidMessages(&msg, 1); +} + /* * CacheInvalidateRelcacheAll * Register invalidation of the whole relcache at the end of command. diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 66ed24e401..708d1ee27a 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -64,6 +64,7 @@ #include "catalog/pg_type.h" #include "catalog/schemapg.h" #include "catalog/storage.h" +#include "commands/cluster.h" #include "commands/policy.h" #include "commands/publicationcmds.h" #include "commands/trigger.h" @@ -1257,6 +1258,10 @@ retry: /* make sure relation is marked as having no open file yet */ relation->rd_smgr = NULL; + /* Is CLUSTER CONCURRENTLY in progress? */ + relation->rd_cluster_concurrent = + is_concurrent_cluster_in_progress(targetRelId); + /* * now we can free the memory allocated for pg_class_tuple */ diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 7d2b34d4f2..6be0fef84c 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -155,9 +155,7 @@ typedef struct ExportedSnapshot static List *exportedSnapshots = NIL; /* Prototypes for local functions */ -static Snapshot CopySnapshot(Snapshot snapshot); static void UnregisterSnapshotNoOwner(Snapshot snapshot); -static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); /* ResourceOwner callbacks to track snapshot references */ @@ -570,7 +568,7 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, * The copy is palloc'd in TopTransactionContext and has initial refcounts set * to 0. The returned snapshot has the copied flag set. */ -static Snapshot +Snapshot CopySnapshot(Snapshot snapshot) { Snapshot newsnap; @@ -626,7 +624,7 @@ CopySnapshot(Snapshot snapshot) * FreeSnapshot * Free the memory associated with a snapshot. */ -static void +void FreeSnapshot(Snapshot snapshot) { Assert(snapshot->regd_count == 0); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d453e224d9..6cab6ed5ee 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2787,7 +2787,7 @@ psql_completion(const char *text, int start, int end) * one word, so the above test is correct. */ if (ends_with(prev_wd, '(') || ends_with(prev_wd, ',')) - COMPLETE_WITH("VERBOSE"); + COMPLETE_WITH("VERBOSE", "CONCURRENTLY"); } /* COMMENT */ @@ -4764,7 +4764,8 @@ psql_completion(const char *text, int start, int end) "DISABLE_PAGE_SKIPPING", "SKIP_LOCKED", "INDEX_CLEANUP", "PROCESS_MAIN", "PROCESS_TOAST", "TRUNCATE", "PARALLEL", "SKIP_DATABASE_STATS", - "ONLY_DATABASE_STATS", "BUFFER_USAGE_LIMIT"); + "ONLY_DATABASE_STATS", "BUFFER_USAGE_LIMIT", + "CONCURRENTLY"); else if (TailMatches("FULL|FREEZE|ANALYZE|VERBOSE|DISABLE_PAGE_SKIPPING|SKIP_LOCKED|PROCESS_MAIN|PROCESS_TOAST|TRUNCATE|SKIP_DATABASE_STATS|ONLY_DATABASE_STATS")) COMPLETE_WITH("ON", "OFF"); else if (TailMatches("INDEX_CLEANUP")) diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 9e9aec88a6..e87eb2f861 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -316,21 +316,24 @@ extern BulkInsertState GetBulkInsertState(void); extern void FreeBulkInsertState(BulkInsertState); extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); -extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, - int options, BulkInsertState bistate); +extern void heap_insert(Relation relation, HeapTuple tup, TransactionId xid, + CommandId cid, int options, BulkInsertState bistate); extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); extern TM_Result heap_delete(Relation relation, ItemPointer tid, - CommandId cid, Snapshot crosscheck, bool wait, - struct TM_FailureData *tmfd, bool changingPart); + TransactionId xid, CommandId cid, + Snapshot crosscheck, bool wait, + struct TM_FailureData *tmfd, bool changingPart, + bool wal_logical); extern void heap_finish_speculative(Relation relation, ItemPointer tid); extern void heap_abort_speculative(Relation relation, ItemPointer tid); extern TM_Result heap_update(Relation relation, ItemPointer otid, - HeapTuple newtup, + HeapTuple newtup, TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, LockTupleMode *lockmode, - TU_UpdateIndexes *update_indexes); + TU_UpdateIndexes *update_indexes, + bool wal_logical); extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, @@ -405,6 +408,10 @@ extern HTSV_Result HeapTupleSatisfiesVacuumHorizon(HeapTuple htup, Buffer buffer TransactionId *dead_after); extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid); +extern bool HeapTupleMVCCInserted(HeapTuple htup, Snapshot snapshot, + Buffer buffer); +extern bool HeapTupleMVCCNotDeleted(HeapTuple htup, Snapshot snapshot, + Buffer buffer); extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple); extern bool HeapTupleIsSurelyDead(HeapTuple htup, struct GlobalVisState *vistest); diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 42736f37e7..1c5cb7c728 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -103,6 +103,8 @@ #define XLH_DELETE_CONTAINS_OLD_KEY (1<<2) #define XLH_DELETE_IS_SUPER (1<<3) #define XLH_DELETE_IS_PARTITION_MOVE (1<<4) +/* See heap_delete() */ +#define XLH_DELETE_NO_LOGICAL (1<<5) /* convenience macro for checking whether any form of old tuple was logged */ #define XLH_DELETE_CONTAINS_OLD \ diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index da661289c1..1380ba81fc 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -21,6 +21,7 @@ #include "access/sdir.h" #include "access/xact.h" #include "executor/tuptable.h" +#include "replication/logical.h" #include "storage/read_stream.h" #include "utils/rel.h" #include "utils/snapshot.h" @@ -630,6 +631,8 @@ typedef struct TableAmRoutine Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -1667,6 +1670,10 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) * not needed for the relation's AM * - *xid_cutoff - ditto * - *multi_cutoff - ditto + * - snapshot - if != NULL, ignore data changes done by transactions that this + * (MVCC) snapshot considers still in-progress or in the future. + * - decoding_ctx - logical decoding context, to capture concurrent data + * changes. * * Output parameters: * - *xid_cutoff - rel's new relfrozenxid value, may be invalid @@ -1679,6 +1686,8 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -1687,6 +1696,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, { OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex, use_sort, OldestXmin, + snapshot, decoding_ctx, xid_cutoff, multi_cutoff, num_tuples, tups_vacuumed, tups_recently_dead); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 6d4439f052..e0016631f6 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -476,6 +476,8 @@ extern Size EstimateTransactionStateSpace(void); extern void SerializeTransactionState(Size maxsize, char *start_address); extern void StartParallelWorkerTransaction(char *tstatespace); extern void EndParallelWorkerTransaction(void); +extern void SetClusterCurrentXids(TransactionId *xip, int xcnt); +extern void ResetClusterCurrentXids(void); extern bool IsTransactionBlock(void); extern bool IsTransactionOrTransactionBlock(void); extern char TransactionBlockStatusCode(void); diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 7d434f8e65..77d522561b 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -99,6 +99,9 @@ extern Oid index_concurrently_create_copy(Relation heapRelation, Oid tablespaceOid, const char *newName); +extern NullableDatum *get_index_stattargets(Oid indexid, + IndexInfo *indInfo); + extern void index_concurrently_build(Oid heapRelationId, Oid indexRelationId); diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index 7492796ea2..f98b855f21 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -13,10 +13,15 @@ #ifndef CLUSTER_H #define CLUSTER_H +#include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" +#include "replication/logical.h" #include "storage/lock.h" +#include "storage/relfilelocator.h" #include "utils/relcache.h" +#include "utils/resowner.h" +#include "utils/tuplestore.h" /* flag bits for ClusterParams->options */ @@ -24,6 +29,7 @@ #define CLUOPT_RECHECK 0x02 /* recheck relation state */ #define CLUOPT_RECHECK_ISCLUSTERED 0x04 /* recheck relation state for * indisclustered */ +#define CLUOPT_CONCURRENT 0x08 /* allow concurrent data changes */ /* options for CLUSTER */ typedef struct ClusterParams @@ -31,12 +37,114 @@ typedef struct ClusterParams bits32 options; /* bitmask of CLUOPT_* */ } ClusterParams; +/* + * The following definitions are used for concurrent processing. + */ + +/* + * Lock level for the concurrent variant of CLUSTER / VACUUM FULL. + * + * Like for lazy VACUUM, we choose the strongest lock that still allows + * INSERT, UPDATE and DELETE. + * + * Note that the lock needs to be released temporarily a few times during the + * processing. In such cases it should be checked after re-locking that the + * relation / index hasn't changed in the system catalog while the lock was + * not held. + */ +#define LOCK_CLUSTER_CONCURRENT ShareUpdateExclusiveLock + +typedef enum +{ + CHANGE_INSERT, + CHANGE_UPDATE_OLD, + CHANGE_UPDATE_NEW, + CHANGE_DELETE, + CHANGE_TRUNCATE +} ConcurrentChangeKind; + +typedef struct ConcurrentChange +{ + /* See the enum above. */ + ConcurrentChangeKind kind; + + /* Transaction that changes the data. */ + TransactionId xid; + + /* + * Historic catalog snapshot that was used to decode this change. + */ + Snapshot snapshot; + + /* + * The actual tuple. + * + * The tuple data follows the ConcurrentChange structure. Before use make + * sure the tuple is correctly aligned (ConcurrentChange can be stored as + * bytea) and that tuple->t_data is fixed. + */ + HeapTupleData tup_data; +} ConcurrentChange; + +/* + * Logical decoding state. + * + * Here we store the data changes that we decode from WAL while the table + * contents is being copied to a new storage. Also the necessary metadata + * needed to apply these changes to the table is stored here. + */ +typedef struct ClusterDecodingState +{ + /* The relation whose changes we're decoding. */ + Oid relid; + + /* + * Decoded changes are stored here. Although we try to avoid excessive + * batches, it can happen that the changes need to be stored to disk. The + * tuplestore does this transparently. + */ + Tuplestorestate *tstore; + + /* The current number of changes in tstore. */ + double nchanges; + + /* + * Descriptor to store the ConcurrentChange structure serialized (bytea). + * We can't store the tuple directly because tuplestore only supports + * minimum tuple and we may need to transfer OID system column from the + * output plugin. Also we need to transfer the change kind, so it's better + * to put everything in the structure than to use 2 tuplestores "in + * parallel". + */ + TupleDesc tupdesc_change; + + /* Tuple descriptor needed to update indexes. */ + TupleDesc tupdesc; + + /* Slot to retrieve data from tstore. */ + TupleTableSlot *tsslot; + + /* + * Historic catalog snapshot that was used to decode the most recent + * change. + */ + Snapshot snapshot; + /* LSN of the record */ + XLogRecPtr snapshot_lsn; + + ResourceOwner resowner; +} ClusterDecodingState; + extern void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel); -extern void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params); +extern void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, + bool isTopLevel); extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode); extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); - +extern bool check_relation_is_clusterable_concurrently(Relation rel, int elevel, + const char *stmt); +extern void cluster_decode_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal); extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode_old, LOCKMODE *lockmode_new_p); @@ -45,8 +153,13 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool swap_toast_by_content, bool check_constraints, bool is_internal, + bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence); +extern Size ClusterShmemSize(void); +extern void ClusterShmemInit(void); +extern bool is_concurrent_cluster_in_progress(Oid relid); +extern void check_for_concurrent_cluster(Oid relid, LOCKMODE lockmode); #endif /* CLUSTER_H */ diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 5616d64523..03e3712ede 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -59,19 +59,22 @@ #define PROGRESS_CLUSTER_PHASE 1 #define PROGRESS_CLUSTER_INDEX_RELID 2 #define PROGRESS_CLUSTER_HEAP_TUPLES_SCANNED 3 -#define PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN 4 -#define PROGRESS_CLUSTER_TOTAL_HEAP_BLKS 5 -#define PROGRESS_CLUSTER_HEAP_BLKS_SCANNED 6 -#define PROGRESS_CLUSTER_INDEX_REBUILD_COUNT 7 +#define PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED 4 +#define PROGRESS_CLUSTER_HEAP_TUPLES_UPDATED 5 +#define PROGRESS_CLUSTER_HEAP_TUPLES_DELETED 6 +#define PROGRESS_CLUSTER_TOTAL_HEAP_BLKS 7 +#define PROGRESS_CLUSTER_HEAP_BLKS_SCANNED 8 +#define PROGRESS_CLUSTER_INDEX_REBUILD_COUNT 9 /* Phases of cluster (as advertised via PROGRESS_CLUSTER_PHASE) */ #define PROGRESS_CLUSTER_PHASE_SEQ_SCAN_HEAP 1 #define PROGRESS_CLUSTER_PHASE_INDEX_SCAN_HEAP 2 #define PROGRESS_CLUSTER_PHASE_SORT_TUPLES 3 #define PROGRESS_CLUSTER_PHASE_WRITE_NEW_HEAP 4 -#define PROGRESS_CLUSTER_PHASE_SWAP_REL_FILES 5 -#define PROGRESS_CLUSTER_PHASE_REBUILD_INDEX 6 -#define PROGRESS_CLUSTER_PHASE_FINAL_CLEANUP 7 +#define PROGRESS_CLUSTER_PHASE_CATCH_UP 5 +#define PROGRESS_CLUSTER_PHASE_SWAP_REL_FILES 6 +#define PROGRESS_CLUSTER_PHASE_REBUILD_INDEX 7 +#define PROGRESS_CLUSTER_PHASE_FINAL_CLEANUP 8 /* Commands of PROGRESS_CLUSTER */ #define PROGRESS_CLUSTER_COMMAND_CLUSTER 1 diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 759f9a87d3..2f693e0fc0 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -181,13 +181,16 @@ typedef struct VacAttrStats #define VACOPT_ANALYZE 0x02 /* do ANALYZE */ #define VACOPT_VERBOSE 0x04 /* output INFO instrumentation messages */ #define VACOPT_FREEZE 0x08 /* FREEZE option */ -#define VACOPT_FULL 0x10 /* FULL (non-concurrent) vacuum */ -#define VACOPT_SKIP_LOCKED 0x20 /* skip if cannot get lock */ -#define VACOPT_PROCESS_MAIN 0x40 /* process main relation */ -#define VACOPT_PROCESS_TOAST 0x80 /* process the TOAST table, if any */ -#define VACOPT_DISABLE_PAGE_SKIPPING 0x100 /* don't skip any pages */ -#define VACOPT_SKIP_DATABASE_STATS 0x200 /* skip vac_update_datfrozenxid() */ -#define VACOPT_ONLY_DATABASE_STATS 0x400 /* only vac_update_datfrozenxid() */ +#define VACOPT_FULL_EXCLUSIVE 0x10 /* FULL (non-concurrent) vacuum */ +#define VACOPT_FULL_CONCURRENT 0x20 /* FULL (concurrent) vacuum */ +#define VACOPT_SKIP_LOCKED 0x40 /* skip if cannot get lock */ +#define VACOPT_PROCESS_MAIN 0x80 /* process main relation */ +#define VACOPT_PROCESS_TOAST 0x100 /* process the TOAST table, if any */ +#define VACOPT_DISABLE_PAGE_SKIPPING 0x200 /* don't skip any pages */ +#define VACOPT_SKIP_DATABASE_STATS 0x400 /* skip vac_update_datfrozenxid() */ +#define VACOPT_ONLY_DATABASE_STATS 0x800 /* only vac_update_datfrozenxid() */ + +#define VACOPT_FULL (VACOPT_FULL_EXCLUSIVE | VACOPT_FULL_CONCURRENT) /* * Values used by index_cleanup and truncate params. diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index a3360a1c5e..abbfb616ce 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -68,6 +68,8 @@ extern void FreeSnapshotBuilder(SnapBuild *builder); extern void SnapBuildSnapDecRefcount(Snapshot snap); extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder); +extern Snapshot SnapBuildInitialSnapshotForCluster(SnapBuild *builder); +extern Snapshot SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place); extern const char *SnapBuildExportSnapshot(SnapBuild *builder); extern void SnapBuildClearExportedSnapshot(void); extern void SnapBuildResetExportedSnapshotState(void); diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h index 934ba84f6a..cac3d7f8c7 100644 --- a/src/include/storage/lockdefs.h +++ b/src/include/storage/lockdefs.h @@ -36,7 +36,7 @@ typedef int LOCKMODE; #define AccessShareLock 1 /* SELECT */ #define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE */ #define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE */ -#define ShareUpdateExclusiveLock 4 /* VACUUM (non-FULL), ANALYZE, CREATE +#define ShareUpdateExclusiveLock 4 /* VACUUM (non-exclusive), ANALYZE, CREATE * INDEX CONCURRENTLY */ #define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY) */ #define ShareRowExclusiveLock 6 /* like EXCLUSIVE MODE, but allows ROW diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 6a2f64c54f..a5f59b6c12 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -83,3 +83,4 @@ PG_LWLOCK(49, WALSummarizer) PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) +PG_LWLOCK(53, ClusteredRels) diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h index e09598eafc..5ab5df9d41 100644 --- a/src/include/utils/backend_progress.h +++ b/src/include/utils/backend_progress.h @@ -35,7 +35,7 @@ typedef enum ProgressCommandType /* * Any command which wishes can advertise that it is running by setting - * command, command_target, and param[]. command_target should be the OID of + * ommand, command_target, and param[]. command_target should be the OID of * the relation which the command targets (we assume there's just one, as this * is meant for utility commands), but the meaning of each element in the * param array is command-specific. @@ -55,6 +55,7 @@ extern void pgstat_progress_parallel_incr_param(int index, int64 incr); extern void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val); extern void pgstat_progress_end_command(void); +extern void pgstat_progress_restore_state(PgBackendProgress *backup); #endif /* BACKEND_PROGRESS_H */ diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index 24695facf2..4acf9d0ed9 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -42,6 +42,8 @@ extern void CacheInvalidateCatalog(Oid catalogId); extern void CacheInvalidateRelcache(Relation relation); +extern void CacheInvalidateRelcacheImmediate(Relation relation); + extern void CacheInvalidateRelcacheAll(void); extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 8700204953..adda46c985 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -253,6 +253,9 @@ typedef struct RelationData bool pgstat_enabled; /* should relation stats be counted */ /* use "struct" here to avoid needing to include pgstat.h: */ struct PgStat_TableStatus *pgstat_info; /* statistics collection area */ + + /* Is CLUSTER CONCURRENTLY being performed on this relation? */ + bool rd_cluster_concurrent; } RelationData; @@ -684,7 +687,9 @@ RelationCloseSmgr(Relation relation) #define RelationIsAccessibleInLogicalDecoding(relation) \ (XLogLogicalInfoActive() && \ RelationNeedsWAL(relation) && \ - (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))) + (IsCatalogRelation(relation) || \ + RelationIsUsedAsCatalogTable(relation) || \ + (relation)->rd_cluster_concurrent)) /* * RelationIsLogicallyLogged diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 9398a84051..f58c9108fc 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -68,6 +68,9 @@ extern Snapshot GetLatestSnapshot(void); extern void SnapshotSetCommandId(CommandId curcid); extern Snapshot GetOldestSnapshot(void); +extern Snapshot CopySnapshot(Snapshot snapshot); +extern void FreeSnapshot(Snapshot snapshot); + extern Snapshot GetCatalogSnapshot(Oid relid); extern Snapshot GetNonHistoricCatalogSnapshot(Oid relid); extern void InvalidateCatalogSnapshot(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 4c789279e5..22cb0702dc 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1958,17 +1958,20 @@ pg_stat_progress_cluster| SELECT s.pid, WHEN 2 THEN 'index scanning heap'::text WHEN 3 THEN 'sorting tuples'::text WHEN 4 THEN 'writing new heap'::text - WHEN 5 THEN 'swapping relation files'::text - WHEN 6 THEN 'rebuilding index'::text - WHEN 7 THEN 'performing final cleanup'::text + WHEN 5 THEN 'catch-up'::text + WHEN 6 THEN 'swapping relation files'::text + WHEN 7 THEN 'rebuilding index'::text + WHEN 8 THEN 'performing final cleanup'::text ELSE NULL::text END AS phase, (s.param3)::oid AS cluster_index_relid, s.param4 AS heap_tuples_scanned, - s.param5 AS heap_tuples_written, - s.param6 AS heap_blks_total, - s.param7 AS heap_blks_scanned, - s.param8 AS index_rebuild_count + s.param5 AS heap_tuples_inserted, + s.param6 AS heap_tuples_updated, + s.param7 AS heap_tuples_deleted, + s.param8 AS heap_blks_total, + s.param9 AS heap_blks_scanned, + s.param10 AS index_rebuild_count FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6,param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19,param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_copy| SELECT s.pid, -- 2.45.2 From 8acfb903cb62baabea2b32174ce98b78d840e068 Mon Sep 17 00:00:00 2001 From: Antonin Houska <ah@cybertec.at> Date: Tue, 9 Jul 2024 17:46:00 +0200 Subject: [PATCH 4/4] Call logical_rewrite_heap_tuple() when applying concurrent data changes. This was implemented for the sake of completeness, but I think it's currently not needed. Possible use cases could be: 1. VACUUM FULL / CLUSTER CONCURRENTLY can process system catalogs. System catalogs are scanned using a historic snapshot during logical decoding, and the "combo CIDs" information is needed for that. Since "combo CID" is associated with the "file locator" and that locator is changed by VACUUM FULL / CLUSTER, these commands must record the information on individual tuples being moved from the old file to the new one. This is what logical_rewrite_heap_tuple() does. However, the logical decoding subsystem currently does not support decoding of data changes in the system catalog. Therefore, the CONCURRENTLY option cannot be used for system catalogs. 2. VACUUM FULL / CLUSTER CONCURRENTLY is processing a relation, but once it has released all the locks (in order to get the exclusive lock), another backend runs VACUUM FULL / CLUSTER CONCURRENTLY on the same table. Since the relation is treated as a system catalog while these commands are processing it (so it can be scanned using a historic snapshot during the "initial load"), it is important that the 2nd backend does not break decoding of the "combo CIDs" performed by the 1st backend. However, it's not practical to let multiple backends run VACUUM FULL / CLUSTER CONCURRENTLY on the same relation, so we forbid that. --- src/backend/access/heap/heapam_handler.c | 2 +- src/backend/access/heap/rewriteheap.c | 65 ++++++----- src/backend/commands/cluster.c | 102 ++++++++++++++---- src/backend/replication/logical/decode.c | 41 ++++++- .../pgoutput_cluster/pgoutput_cluster.c | 21 ++-- src/include/access/rewriteheap.h | 5 +- src/include/commands/cluster.h | 3 + src/include/replication/reorderbuffer.h | 7 ++ 8 files changed, 187 insertions(+), 59 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 02fd6d2983..cccfff62bd 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -735,7 +735,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* Initialize the rewrite operation */ rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff, - *multi_cutoff); + *multi_cutoff, true); /* Set up sorting if wanted */ diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 473f3aa9be..050c8306da 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -214,10 +214,8 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup); /* internal logical remapping prototypes */ static void logical_begin_heap_rewrite(RewriteState state); -static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple); static void logical_end_heap_rewrite(RewriteState state); - /* * Begin a rewrite of a table * @@ -226,18 +224,19 @@ static void logical_end_heap_rewrite(RewriteState state); * oldest_xmin xid used by the caller to determine which tuples are dead * freeze_xid xid before which tuples will be frozen * cutoff_multi multixact before which multis will be removed + * tid_chains need to maintain TID chains? * * Returns an opaque RewriteState, allocated in current memory context, * to be used in subsequent calls to the other functions. */ RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, - TransactionId freeze_xid, MultiXactId cutoff_multi) + TransactionId freeze_xid, MultiXactId cutoff_multi, + bool tid_chains) { RewriteState state; MemoryContext rw_cxt; MemoryContext old_cxt; - HASHCTL hash_ctl; /* * To ease cleanup, make a separate context that will contain the @@ -262,29 +261,34 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm state->rs_cxt = rw_cxt; state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM); - /* Initialize hash tables used to track update chains */ - hash_ctl.keysize = sizeof(TidHashKey); - hash_ctl.entrysize = sizeof(UnresolvedTupData); - hash_ctl.hcxt = state->rs_cxt; - - state->rs_unresolved_tups = - hash_create("Rewrite / Unresolved ctids", - 128, /* arbitrary initial size */ - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - hash_ctl.entrysize = sizeof(OldToNewMappingData); + if (tid_chains) + { + HASHCTL hash_ctl; + + /* Initialize hash tables used to track update chains */ + hash_ctl.keysize = sizeof(TidHashKey); + hash_ctl.entrysize = sizeof(UnresolvedTupData); + hash_ctl.hcxt = state->rs_cxt; + + state->rs_unresolved_tups = + hash_create("Rewrite / Unresolved ctids", + 128, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + hash_ctl.entrysize = sizeof(OldToNewMappingData); + + state->rs_old_new_tid_map = + hash_create("Rewrite / Old to new tid map", + 128, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } - state->rs_old_new_tid_map = - hash_create("Rewrite / Old to new tid map", - 128, /* arbitrary initial size */ - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + logical_begin_heap_rewrite(state); MemoryContextSwitchTo(old_cxt); - logical_begin_heap_rewrite(state); - return state; } @@ -303,12 +307,15 @@ end_heap_rewrite(RewriteState state) * Write any remaining tuples in the UnresolvedTups table. If we have any * left, they should in fact be dead, but let's err on the safe side. */ - hash_seq_init(&seq_status, state->rs_unresolved_tups); - - while ((unresolved = hash_seq_search(&seq_status)) != NULL) + if (state->rs_unresolved_tups) { - ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid); - raw_heap_insert(state, unresolved->tuple); + hash_seq_init(&seq_status, state->rs_unresolved_tups); + + while ((unresolved = hash_seq_search(&seq_status)) != NULL) + { + ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid); + raw_heap_insert(state, unresolved->tuple); + } } /* Write the last page, if any */ @@ -995,7 +1002,7 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid, * Perform logical remapping for a tuple that's mapped from old_tid to * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple. */ -static void +void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple) { diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 6397f7f8c4..42e8118b7d 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -21,6 +21,7 @@ #include "access/heapam.h" #include "access/multixact.h" #include "access/relscan.h" +#include "access/rewriteheap.h" #include "access/tableam.h" #include "access/toast_internals.h" #include "access/transam.h" @@ -179,17 +180,21 @@ static LogicalDecodingContext *setup_logical_decoding(Oid relid, static HeapTuple get_changed_tuple(ConcurrentChange *change); static void apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel, ScanKey key, int nkeys, - IndexInsertState *iistate); + IndexInsertState *iistate, + RewriteState rwstate); static void apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, IndexInsertState *iistate, - TupleTableSlot *index_slot); + TupleTableSlot *index_slot, + RewriteState rwstate); static void apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, ConcurrentChange *change, IndexInsertState *iistate, - TupleTableSlot *index_slot); + TupleTableSlot *index_slot, + RewriteState rwstate); static void apply_concurrent_delete(Relation rel, HeapTuple tup_target, - ConcurrentChange *change); + ConcurrentChange *change, + RewriteState rwstate); static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, Snapshot snapshot, @@ -202,7 +207,8 @@ static void process_concurrent_changes(LogicalDecodingContext *ctx, Relation rel_src, ScanKey ident_key, int ident_key_nentries, - IndexInsertState *iistate); + IndexInsertState *iistate, + RewriteState rwstate); static IndexInsertState *get_index_insert_state(Relation relation, Oid ident_index_id); static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src, @@ -3073,7 +3079,8 @@ cluster_decode_concurrent_changes(LogicalDecodingContext *ctx, */ static void apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel, - ScanKey key, int nkeys, IndexInsertState *iistate) + ScanKey key, int nkeys, IndexInsertState *iistate, + RewriteState rwstate) { TupleTableSlot *index_slot, *ident_slot; HeapTuple tup_old = NULL; @@ -3144,7 +3151,8 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel, { Assert(tup_old == NULL); - apply_concurrent_insert(rel, change, tup, iistate, index_slot); + apply_concurrent_insert(rel, change, tup, iistate, index_slot, + rwstate); pfree(tup); } @@ -3152,7 +3160,7 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel, change->kind == CHANGE_DELETE) { IndexScanDesc ind_scan = NULL; - HeapTuple tup_key; + HeapTuple tup_key, tup_exist_cp; if (change->kind == CHANGE_UPDATE_NEW) { @@ -3193,11 +3201,23 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel, if (tup_exist == NULL) elog(ERROR, "Failed to find target tuple"); + /* + * Update the mapping for xmax of the old version. + * + * Use a copy ('tup_exist' can point to shared buffer) with xmin + * invalid because mapping of that should have been written on + * insertion. + */ + tup_exist_cp = heap_copytuple(tup_exist); + HeapTupleHeaderSetXmin(tup_exist_cp->t_data, InvalidTransactionId); + logical_rewrite_heap_tuple(rwstate, change->old_tid, tup_exist_cp); + pfree(tup_exist_cp); + if (change->kind == CHANGE_UPDATE_NEW) apply_concurrent_update(rel, tup, tup_exist, change, iistate, - index_slot); + index_slot, rwstate); else - apply_concurrent_delete(rel, tup_exist, change); + apply_concurrent_delete(rel, tup_exist, change, rwstate); ResetClusterCurrentXids(); @@ -3238,9 +3258,12 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel, static void apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, - IndexInsertState *iistate, TupleTableSlot *index_slot) + IndexInsertState *iistate, TupleTableSlot *index_slot, + RewriteState rwstate) { + HeapTupleHeader tup_hdr = tup->t_data; Snapshot snapshot = change->snapshot; + ItemPointerData old_tid; List *recheck; /* @@ -3250,6 +3273,9 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, */ SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt); + /* Remember location in the old heap. */ + ItemPointerCopy(&tup_hdr->t_ctid, &old_tid); + /* * Write the tuple into the new heap. * @@ -3265,6 +3291,14 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, heap_insert(rel, tup, change->xid, snapshot->curcid - 1, HEAP_INSERT_NO_LOGICAL, NULL); + /* + * Update the mapping for xmin. (xmax should be invalid). This is needed + * because, during the processing, the table is considered an "user + * catalog". + */ + Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data))); + logical_rewrite_heap_tuple(rwstate, old_tid, tup); + /* * Update indexes. * @@ -3298,16 +3332,19 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, static void apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, ConcurrentChange *change, IndexInsertState *iistate, - TupleTableSlot *index_slot) + TupleTableSlot *index_slot, RewriteState rwstate) { List *recheck; LockTupleMode lockmode; TU_UpdateIndexes update_indexes; - ItemPointerData tid_old_new_heap; + ItemPointerData tid_new_old_heap, tid_old_new_heap; TM_Result res; Snapshot snapshot = change->snapshot; TM_FailureData tmfd; + /* Location of the new tuple in the old heap. */ + ItemPointerCopy(&tup->t_data->t_ctid, &tid_new_old_heap); + /* Location of the existing tuple in the new heap. */ ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap); @@ -3330,6 +3367,10 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, if (res != TM_Ok) ereport(ERROR, (errmsg("failed to apply concurrent UPDATE"))); + /* Update the mapping for xmin of the new version. */ + Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data))); + logical_rewrite_heap_tuple(rwstate, tid_new_old_heap, tup); + ExecStoreHeapTuple(tup, index_slot, false); if (update_indexes != TU_None) @@ -3353,7 +3394,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, static void apply_concurrent_delete(Relation rel, HeapTuple tup_target, - ConcurrentChange *change) + ConcurrentChange *change, RewriteState rwstate) { ItemPointerData tid_old_new_heap; TM_Result res; @@ -3444,7 +3485,8 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, static void process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal, Relation rel_dst, Relation rel_src, ScanKey ident_key, - int ident_key_nentries, IndexInsertState *iistate) + int ident_key_nentries, IndexInsertState *iistate, + RewriteState rwstate) { ClusterDecodingState *dstate; @@ -3468,7 +3510,7 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal, rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid; apply_concurrent_changes(dstate, rel_dst, ident_key, - ident_key_nentries, iistate); + ident_key_nentries, iistate, rwstate); } PG_FINALLY(); { @@ -3631,6 +3673,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, bool is_system_catalog; Oid ident_idx_old, ident_idx_new; IndexInsertState *iistate; + RewriteState rwstate; ScanKey ident_key; int ident_key_nentries; XLogRecPtr wal_insert_ptr, end_of_wal; @@ -3708,10 +3751,26 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, * Apply concurrent changes first time, to minimize the time we need to * hold AccessExclusiveLock. (Quite some amount of WAL could have been * written during the data copying and index creation.) + * + * Now we are processing individual tuples, so pass false for + * 'tid_chains'. Since rwstate is now only needed for + * logical_begin_heap_rewrite(), none of the transaction IDs needs to be + * valid. */ + rwstate = begin_heap_rewrite(OldHeap, NewHeap, + InvalidTransactionId, + InvalidTransactionId, + InvalidTransactionId, + false); process_concurrent_changes(ctx, end_of_wal, NewHeap, swap_toast_by_content ? OldHeap : NULL, - ident_key, ident_key_nentries, iistate); + ident_key, ident_key_nentries, iistate, + rwstate); + /* + * OldHeap will be closed, so we need to initialize rwstate again for the + * next call of process_concurrent_changes(). + */ + end_heap_rewrite(rwstate); /* * Release the locks that allowed concurrent data changes, in order to @@ -3833,9 +3892,16 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, end_of_wal = GetFlushRecPtr(NULL); /* Apply the concurrent changes again. */ + rwstate = begin_heap_rewrite(OldHeap, NewHeap, + InvalidTransactionId, + InvalidTransactionId, + InvalidTransactionId, + false); process_concurrent_changes(ctx, end_of_wal, NewHeap, swap_toast_by_content ? OldHeap : NULL, - ident_key, ident_key_nentries, iistate); + ident_key, ident_key_nentries, iistate, + rwstate); + end_heap_rewrite(rwstate); /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 066d96dea2..69a43e3510 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -951,11 +951,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_insert *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; + HeapTupleHeader tuphdr; xlrec = (xl_heap_insert *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -980,6 +982,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); + /* + * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER + * CONCURRENTLY. + */ + tuphdr = change->data.tp.newtuple->t_data; + ItemPointerSet(&tuphdr->t_ctid, blknum, xlrec->offnum); + change->data.tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, @@ -1001,11 +1010,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferChange *change; char *data; RelFileLocator target_locator; + BlockNumber old_blknum, new_blknum; xlrec = (xl_heap_update *) XLogRecGetData(r); + /* Retrieve blknum, so that we can compose CTID below. */ + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum); + /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1022,6 +1034,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { Size datalen; Size tuplelen; + HeapTupleHeader tuphdr; data = XLogRecGetBlockData(r, 0, &datalen); @@ -1031,6 +1044,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); + + /* + * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER + * CONCURRENTLY. + */ + tuphdr = change->data.tp.newtuple->t_data; + ItemPointerSet(&tuphdr->t_ctid, new_blknum, xlrec->new_offnum); } if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) @@ -1049,6 +1069,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } + /* + * Remember the old tuple CTID, for the sake of + * logical_rewrite_heap_tuple(). + */ + if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &old_blknum, NULL)) + old_blknum = new_blknum; + ItemPointerSet(&change->data.tp.old_tid, old_blknum, xlrec->old_offnum); + change->data.tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, @@ -1067,11 +1095,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_delete *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; xlrec = (xl_heap_delete *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1103,6 +1132,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, datalen, change->data.tp.oldtuple); + + /* + * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER + * CONCURRENTLY. + */ + ItemPointerSet(&change->data.tp.old_tid, blknum, xlrec->offnum); } change->data.tp.clear_toast_afterwards = true; diff --git a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c index 9fe44017a8..2c33fbad82 100644 --- a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c +++ b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c @@ -34,7 +34,7 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferChange *change); static void store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, HeapTuple tuple, - TransactionId xid); + TransactionId xid, ItemPointer old_tid); void _PG_output_plugin_init(OutputPluginCallbacks *cb) @@ -162,7 +162,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (newtuple == NULL) elog(ERROR, "Incomplete insert info."); - store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid); + store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid, + NULL); } break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -180,10 +181,10 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (oldtuple != NULL) store_change(ctx, CHANGE_UPDATE_OLD, oldtuple, - change->txn->xid); + change->txn->xid, NULL); store_change(ctx, CHANGE_UPDATE_NEW, newtuple, - change->txn->xid); + change->txn->xid, &change->data.tp.old_tid); } break; case REORDER_BUFFER_CHANGE_DELETE: @@ -196,7 +197,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (oldtuple == NULL) elog(ERROR, "Incomplete delete info."); - store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid); + store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid, + &change->data.tp.old_tid); } break; default: @@ -230,13 +232,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (i == nrelations) return; - store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId); + store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId, NULL); } /* Store concurrent data change. */ static void store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, - HeapTuple tuple, TransactionId xid) + HeapTuple tuple, TransactionId xid, ItemPointer old_tid) { ClusterDecodingState *dstate; char *change_raw; @@ -301,6 +303,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, change->snapshot = dstate->snapshot; dstate->snapshot->active_count++; + if (old_tid) + ItemPointerCopy(old_tid, &change->old_tid); + else + ItemPointerSetInvalid(&change->old_tid); + /* The data has been copied. */ if (flattened) pfree(tuple); diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h index 5866a26bdd..de62b6abf8 100644 --- a/src/include/access/rewriteheap.h +++ b/src/include/access/rewriteheap.h @@ -23,11 +23,14 @@ typedef struct RewriteStateData *RewriteState; extern RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, - MultiXactId cutoff_multi); + MultiXactId cutoff_multi, bool tid_chains); extern void end_heap_rewrite(RewriteState state); extern void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple); extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple); +extern void logical_rewrite_heap_tuple(RewriteState state, + ItemPointerData old_tid, + HeapTuple new_tuple); /* * On-Disk data format for an individual logical rewrite mapping. diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index f98b855f21..c394ef3871 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -71,6 +71,9 @@ typedef struct ConcurrentChange /* Transaction that changes the data. */ TransactionId xid; + /* For UPDATE / DELETE, the location of the old tuple version. */ + ItemPointerData old_tid; + /* * Historic catalog snapshot that was used to decode this change. */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 851a001c8b..1fa8f8bd6a 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -99,6 +99,13 @@ typedef struct ReorderBufferChange HeapTuple oldtuple; /* valid for INSERT || UPDATE */ HeapTuple newtuple; + + /* + * CLUSTER CONCURRENTLY needs the old TID, even if the old tuple + * itself is not WAL-logged (i.e. when the identity key does not + * change). + */ + ItemPointerData old_tid; } tp; /* -- 2.45.2
Antonin Houska <ah@cybertec.at> wrote: > Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > > > > Is your plan to work on it soon or should I try to write a draft patch? (I > > > assume this is for PG >= 18.) > > > > I don't have plans for it, so if you have resources, please go for it. > > The first version is attached. The actual feature is in 0003. 0004 is probably > not necessary now, but I haven't realized until I coded it. The mailing list archive indicates something is wrong with the 0003 attachment. Sending it all again, as *.tar. -- Antonin Houska Web: https://www.cybertec-postgresql.com
Attachment
On 2024-Jul-09, Antonin Houska wrote: > Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > > > > Is your plan to work on it soon or should I try to write a draft patch? (I > > > assume this is for PG >= 18.) > > > > I don't have plans for it, so if you have resources, please go for it. > > The first version is attached. The actual feature is in 0003. 0004 is probably > not necessary now, but I haven't realized until I coded it. Thank you, this is great. I'll be studying this during the next commitfest. BTW I can apply 0003 from this email perfectly fine, but you're right that the archives don't show the file name. I suspect the "Content-Disposition: inline" PLUS the Content-Type text/plain are what cause the problem -- for instance, [1] doesn't have a problem and they do have inline content disposition, but the content-type is not text/plain. In any case, I encourage you not to send patches as tarballs :-) [1] https://postgr.es/m/32781.1714378236@antos -- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/ "La primera ley de las demostraciones en vivo es: no trate de usar el sistema. Escriba un guión que no toque nada para no causar daños." (Jakob Nielsen)
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > On 2024-Jul-09, Antonin Houska wrote: > > > Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > > > > > > Is your plan to work on it soon or should I try to write a draft patch? (I > > > > assume this is for PG >= 18.) > > > > > > I don't have plans for it, so if you have resources, please go for it. > > > > The first version is attached. The actual feature is in 0003. 0004 is probably > > not necessary now, but I haven't realized until I coded it. > > Thank you, this is great. I'll be studying this during the next > commitfest. Thanks. I'll register it in the CF application. > BTW I can apply 0003 from this email perfectly fine, but you're right > that the archives don't show the file name. I suspect the > "Content-Disposition: inline" PLUS the Content-Type text/plain are what > cause the problem -- for instance, [1] doesn't have a problem and they > do have inline content disposition, but the content-type is not > text/plain. In any case, I encourage you not to send patches as > tarballs :-) > > [1] https://postgr.es/m/32781.1714378236@antos You're right, "Content-Disposition" is the problem. I forgot that "attachment" is better for patches and my email client (emacs+nmh) defaults to "inline". I'll pay attention next time. -- Antonin Houska Web: https://www.cybertec-postgresql.com
Hi! I'm interested in the vacuum concurrently feature being inside the core, so will try to review patch set and give valuable feedback. For now, just a few little thoughts.. > The first version is attached. The actual feature is in 0003. 0004 is probably > not necessary now, but I haven't realized until I coded it. The logical replication vacuum approach is a really smart idea, I like it. As far as I understand, pg_squeeze works well in real production databases, which gives us hope that the vacuum concurrent feature in core will be good too... What is the size of the biggest relation successfully vacuumed via pg_squeeze? Looks like in case of big relartion or high insertion load, replication may lag and never catch up... However, in general, the 3rd patch is really big, very hard to comprehend. Please consider splitting this into smaller (and reviewable) pieces. Also, we obviously need more tests on this. Both tap-test and regression tests I suppose. One more thing is about pg_squeeze background workers. They act in an autovacuum-like fashion, aren't they? Maybe we can support this kind of relation processing in core too?
Hi
ne 21. 7. 2024 v 17:13 odesílatel Kirill Reshke <reshkekirill@gmail.com> napsal:
Hi!
I'm interested in the vacuum concurrently feature being inside the
core, so will try to review patch set and give valuable feedback. For
now, just a few little thoughts..
One more thing is about pg_squeeze background workers. They act in an
autovacuum-like fashion, aren't they? Maybe we can support this kind
of relation processing in core too?
I don't think it is necessary when this feature will be an internal feature.
I agree so this feature is very important, I proposed it (and I very happy so Tonda implemented it), but I am not sure, if usage of this should be automatized, and if it should be, then
a) probably autovacuum should do,
b) we can move a discussion after vacuum full concurrently will be merged to upstream, please. Isn't very practical to have too many open targets.
Regards
Pavel
> Also, we obviously need more tests on this. Both tap-test and > regression tests I suppose. The one simple test to this patch can be done this way: 1) create test relation (call it vac_conc_r1 for example) and fill it with dead tuples (insert + update or insert + delete) 2) create injection point preventing concurrent vacuum from compiling. 3) run concurrent vacuum (VACUUM FULL CONCURRENTLY) in separate thread or in some other async way. 4) Insert new data in relation to vac_conc_r1. 5) Release injection point, assert that vacuum completed successfully. 6) check that all data is present in vac_conc_r1 (data from step 1 and from step 4). This way we can catch some basic buggs, if some paths of VACUUM CONCURRENTLY will be touched in the future. The problem with this test is: i don't know how to do anything async in current TAP tests (needed in step 3). Also, maybe test with async interaction may be too flappy (producing false negative flaps) to support. Sequential test for this feature would be much better, but I can't think of one. Also, should we create a cf entry for this thread already?
On Mon, Jul 22, 2024 at 01:23:03PM +0500, Kirill Reshke wrote: > Also, should we create a cf entry for this thread already? I was wondering about this as well, but there is one for the upcoming commitfest already: https://commitfest.postgresql.org/49/5117/ Michael
Hi! On Tue, 30 Jan 2024 at 15:31, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote: > FWIW a newer, more modern and more trustworthy alternative to pg_repack > is pg_squeeze, which I discovered almost by random chance, and soon > discovered I liked it much more. Can you please clarify this a bit more? What is the exact reason for pg_squeeze being more trustworthy than pg_repack? Is there something about the logical replication approach that makes it more bulletproof than the trigger-based repack approach? Also, I was thinking about pg_repack vs pg_squeeze being used for the VACUUM FULL CONCURRENTLY feature, and I'm a bit suspicious about the latter. If I understand correctly, we essentially parse the whole WAL to obtain info about one particular relation changes. That may be a big overhead, whereas the trigger approach does not suffer from this. So, there is the chance that VACUUM FULL CONCURRENTLY will never keep up with vacuumed relation changes. Am I right?
Kirill Reshke <reshkekirill@gmail.com> wrote: > Also, I was thinking about pg_repack vs pg_squeeze being used for the > VACUUM FULL CONCURRENTLY feature, and I'm a bit suspicious about the > latter. > If I understand correctly, we essentially parse the whole WAL to > obtain info about one particular relation changes. That may be a big > overhead, pg_squeeze is an extension but the logical decoding is performed by the core, so there is no way to ensure that data changes of the "other tables" are not decoded. However, it might be possible if we integrate the functionality into the core. I'll consider doing so in the next version of [1]. > whereas the trigger approach does not suffer from this. So, there is the > chance that VACUUM FULL CONCURRENTLY will never keep up with vacuumed > relation changes. Am I right? Perhaps it can happen, but note that trigger processing is also not free and that in this case the cost is paid by the applications. So while VACUUM FULL CONCURRENTLY (based on logical decoding) might fail to catch-up, the trigger based solution may slow down the applications that execute DML commands while the table is being rewritten. [1] https://commitfest.postgresql.org/49/5117/ -- Antonin Houska Web: https://www.cybertec-postgresql.com
Kirill Reshke <reshkekirill@gmail.com> wrote: > What is the size of the biggest relation successfully vacuumed > via pg_squeeze? > Looks like in case of big relartion or high insertion load, > replication may lag and never catch up... Users reports problems rather than successes, so I don't know. 400 GB was reported in [1] but it's possible that the table size for this test was determined based on available disk space. I think that the amount of data changes performed during the "squeezing" matters more than the table size. In [2] one user reported "thounsands of UPSERTs per second", but the amount of data also depends on row size, which he didn't mention. pg_squeeze gives up if it fails to catch up a few times. The first version of my patch does not check this, I'll add the corresponding code in the next version. > However, in general, the 3rd patch is really big, very hard to > comprehend. Please consider splitting this into smaller (and > reviewable) pieces. I'll try to move some preparation steps into separate diffs, but not sure if that will make the main diff much smaller. I prefer self-contained patches, as also explained in [3]. > Also, we obviously need more tests on this. Both tap-test and > regression tests I suppose. Sure. The next version will use the injection points to test if "concurrent data changes" are processed correctly. > One more thing is about pg_squeeze background workers. They act in an > autovacuum-like fashion, aren't they? Maybe we can support this kind > of relation processing in core too? Maybe later. Even just adding the CONCURRENTLY option to CLUSTER and VACUUM FULL requires quite some effort. [1] https://github.com/cybertec-postgresql/pg_squeeze/issues/51 [2] https://github.com/cybertec-postgresql/pg_squeeze/issues/21#issuecomment-514495369 [3] http://peter.eisentraut.org/blog/2024/05/14/when-to-split-patches-for-postgresql -- Antonin Houska Web: https://www.cybertec-postgresql.com
On Fri, 2 Aug 2024 at 11:09, Antonin Houska <ah@cybertec.at> wrote: > > Kirill Reshke <reshkekirill@gmail.com> wrote: > > However, in general, the 3rd patch is really big, very hard to > > comprehend. Please consider splitting this into smaller (and > > reviewable) pieces. > > I'll try to move some preparation steps into separate diffs, but not sure if > that will make the main diff much smaller. I prefer self-contained patches, as > also explained in [3]. Thanks for sharing [3], it is a useful link. There is actually one more case when ACCESS EXCLUSIVE is held: during table rewrite (AT set TAM, AT set Tablespace and AT alter column type are some examples). This can be done CONCURRENTLY too, using the same logical replication approach, or do I miss something? I'm not saying we must do it immediately, this should be a separate thread, but we can do some preparation work here. I can see that a bunch of functions which are currently placed in cluster.c can be moved to something like logical_rewrite_heap.c. ConcurrentChange struct and apply_concurrent_insert function is one example of such. So, if this is the case, 0003 patch can be splitted in two: The first one is general utility code for logical table rewrite The second one with actual VACUUM CONCURRENTLY feature. What do you think?
Hi, On Tue, Aug 27, 2024 at 8:01 PM Antonin Houska <ah@cybertec.at> wrote: > > Attached is version 2, the feature itself is now in 0004. > > Unlike version 1, it contains some regression tests (0006) and a new GUC to > control how long the AccessExclusiveLock may be held (0007). > > Kirill Reshke <reshkekirill@gmail.com> wrote: > > > On Fri, 2 Aug 2024 at 11:09, Antonin Houska <ah@cybertec.at> wrote: > > > > > > Kirill Reshke <reshkekirill@gmail.com> wrote: > > > > However, in general, the 3rd patch is really big, very hard to > > > > comprehend. Please consider splitting this into smaller (and > > > > reviewable) pieces. > > > > > > I'll try to move some preparation steps into separate diffs, but not sure if > > > that will make the main diff much smaller. I prefer self-contained patches, as > > > also explained in [3]. > > > > Thanks for sharing [3], it is a useful link. > > > > There is actually one more case when ACCESS EXCLUSIVE is held: during > > table rewrite (AT set TAM, AT set Tablespace and AT alter column type > > are some examples). > > This can be done CONCURRENTLY too, using the same logical replication > > approach, or do I miss something? > > Yes, the logical replication can potentially be used in other cases. > > > I'm not saying we must do it immediately, this should be a separate > > thread, but we can do some preparation work here. > > > > I can see that a bunch of functions which are currently placed in > > cluster.c can be moved to something like > > logical_rewrite_heap.c. ConcurrentChange struct and > > apply_concurrent_insert function is one example of such. > > > > So, if this is the case, 0003 patch can be splitted in two: > > The first one is general utility code for logical table rewrite > > The second one with actual VACUUM CONCURRENTLY feature. > > > What do you think? > > I can imagine moving the function process_concurrent_changes() and subroutines > to a different file (e.g. rewriteheap.c), but moving it into a separate diff > that does not contain any call of the function makes little sense to me. Such > a diff would not add any useful functionality and could not be considered > refactoring either. > > So far I at least moved some code to separate diffs: 0003 and 0005. I'll move > more if I find sensible opportunity in the future. > > -- > Antonin Houska > Web: https://www.cybertec-postgresql.com > Thanks for working on this, I think this is a very useful feature. The patch doesn't compile in the debug build with errors: ../postgres/src/backend/commands/cluster.c: In function ‘get_catalog_state’: ../postgres/src/backend/commands/cluster.c:2771:33: error: declaration of ‘td_src’ shadows a previous local [-Werror=shadow=compatible-local] 2771 | TupleDesc td_src, td_dst; | ^~~~~~ ../postgres/src/backend/commands/cluster.c:2741:25: note: shadowed declaration is here 2741 | TupleDesc td_src = RelationGetDescr(rel); you forgot the meson build for pgoutput_cluster diff --git a/src/backend/meson.build b/src/backend/meson.build index 78c5726814..0f9141a4ac 100644 --- a/src/backend/meson.build +++ b/src/backend/meson.build @@ -194,5 +194,6 @@ pg_test_mod_args = pg_mod_args + { subdir('jit/llvm') subdir('replication/libpqwalreceiver') subdir('replication/pgoutput') +subdir('replication/pgoutput_cluster') I noticed that you use lmode/lock_mode/lockmode, there are lmode and lockmode in the codebase, but I remember someone proposed all changes to lockmode, how about sticking to lockmode in your patch? 0004: + sure that the old files do not change during the processing because the + chnages would get lost due to the swap. typo + files. The data changes that took place during the creation of the new + table and index files are captured using logical decoding + (<xref linkend="logicaldecoding"/>) and applied before + the <literal>ACCESS EXCLUSIVE</literal> lock is requested. Thus the lock + is typically held only for the time needed to swap the files, which + should be pretty short. I remember pg_squeeze also did some logical decoding after getting the exclusive lock, if that is still true, I guess the doc above is not precise. + Note that <command>CLUSTER</command> with the + the <literal>CONCURRENTLY</literal> option does not try to order the + rows inserted into the table after the clustering started. Do you mean after the *logical decoding* started here? If CLUSTER CONCURRENTLY does not order rows at all, why bother implementing it? + errhint("CLUSTER CONCURRENTLY is only allowed for permanent relations"))); errhint messages should end with a dot. Why hardcoded to "CLUSTER CONCURRENTLY" instead of parameter *stmt*. + ResourceOwner oldowner = CurrentResourceOwner; + + /* + * In the CONCURRENT case, do the planning in a subtrensaction so that typo I did not see VacuumStmt changes in gram.y, how do we suppose to use the vacuum full concurrently? I tried the following but no success. [local] postgres@demo:5432-36097=# vacuum (concurrently) aircrafts_data; ERROR: CONCURRENTLY can only be specified with VACUUM FULL [local] postgres@demo:5432-36097=# vacuum full (concurrently) full aircrafts_data; ERROR: syntax error at or near "(" LINE 1: vacuum full (concurrently) full aircrafts_data; -- Regards Junwang Zhao
On Wed, Sep 4, 2024 at 7:41 PM Antonin Houska <ah@cybertec.at> wrote: > > Junwang Zhao <zhjwpku@gmail.com> wrote: > > > Thanks for working on this, I think this is a very useful feature. > > > > The patch doesn't compile in the debug build with errors: > > > > ../postgres/src/backend/commands/cluster.c: In function ‘get_catalog_state’: > > ../postgres/src/backend/commands/cluster.c:2771:33: error: declaration > > of ‘td_src’ shadows a previous local [-Werror=shadow=compatible-local] > > 2771 | TupleDesc td_src, td_dst; > > | ^~~~~~ > > ../postgres/src/backend/commands/cluster.c:2741:25: note: shadowed > > declaration is here > > 2741 | TupleDesc td_src = RelationGetDescr(rel); > > ok, gcc14 complains here, the compiler I used before did not. Fixed. > > > you forgot the meson build for pgoutput_cluster > > > > diff --git a/src/backend/meson.build b/src/backend/meson.build > > index 78c5726814..0f9141a4ac 100644 > > --- a/src/backend/meson.build > > +++ b/src/backend/meson.build > > @@ -194,5 +194,6 @@ pg_test_mod_args = pg_mod_args + { > > subdir('jit/llvm') > > subdir('replication/libpqwalreceiver') > > subdir('replication/pgoutput') > > +subdir('replication/pgoutput_cluster') > > Fixed, thanks. That might be the reason for the cfbot to fail when using > meson. > > > I noticed that you use lmode/lock_mode/lockmode, there are lmode and lockmode > > in the codebase, but I remember someone proposed all changes to lockmode, how > > about sticking to lockmode in your patch? > > Fixed. > > > 0004: > > > > + sure that the old files do not change during the processing because the > > + chnages would get lost due to the swap. > > typo > > Fixed. > > > > > + files. The data changes that took place during the creation of the new > > + table and index files are captured using logical decoding > > + (<xref linkend="logicaldecoding"/>) and applied before > > + the <literal>ACCESS EXCLUSIVE</literal> lock is requested. Thus the lock > > + is typically held only for the time needed to swap the files, which > > + should be pretty short. > > > > I remember pg_squeeze also did some logical decoding after getting the exclusive > > lock, if that is still true, I guess the doc above is not precise. > > The decoding takes place before requesting the lock, as well as after > that. I've adjusted the paragraph, see 0007. > > > + Note that <command>CLUSTER</command> with the > > + the <literal>CONCURRENTLY</literal> option does not try to order the > > + rows inserted into the table after the clustering started. > > > > Do you mean after the *logical decoding* started here? If CLUSTER CONCURRENTLY > > does not order rows at all, why bother implementing it? > > The rows inserted before CLUSTER (CONCURRENTLY) started do get ordered, the > rows inserted after that do not. (Actually what matters is when the snapshot > for the initial load is created, but that happens in very early stage of the > processing. Not sure if user is interested in such implementation details.) > > > > + errhint("CLUSTER CONCURRENTLY is only allowed for permanent relations"))); > > > > errhint messages should end with a dot. Why hardcoded to "CLUSTER CONCURRENTLY" > > instead of parameter *stmt*. > > Fixed. > > > + ResourceOwner oldowner = CurrentResourceOwner; > > + > > + /* > > + * In the CONCURRENT case, do the planning in a subtrensaction so that > > typo > > Fixed. > > > I did not see VacuumStmt changes in gram.y, how do we suppose to > > use the vacuum full concurrently? I tried the following but no success. > > With the "parethesized syntax", new options can be added w/o changing > gram.y. (While the "unparenthesized syntax" is deprecated.) > > > [local] postgres@demo:5432-36097=# vacuum (concurrently) aircrafts_data; > > ERROR: CONCURRENTLY can only be specified with VACUUM FULL > > The "lazy" VACUUM works concurrently as such. > > > [local] postgres@demo:5432-36097=# vacuum full (concurrently) full > > aircrafts_data; > > ERROR: syntax error at or near "(" > > LINE 1: vacuum full (concurrently) full aircrafts_data; > > This is not specific to the CONCURRENTLY option. For example: > > postgres=3D# vacuum full (analyze) full aircrafts_data; > ERROR: syntax error at or near "(" > LINE 1: vacuum full (analyze) full aircrafts_data; > > (You seem to combine the parenthesized syntax with the unparenthesized.) Yeah, my mistake, *vacuum (full, concurrently)* works. + if (TransactionIdIsNormal(HeapTupleHeaderGetRawXmax(tuple->t_data)) && + HeapTupleMVCCNotDeleted(tuple, snapshot, buffer)) + { + /* TODO More work needed here?*/ + tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(tuple->t_data, 0); + } I don't quite understand the above code, IIUC xmax and xmax invalid are set directly on the buffer page. What if the command failed? Will this break the visibility rules? btw, v4-0006 failed to apply. > > -- > Antonin Houska > Web: https://www.cybertec-postgresql.com > -- Regards Junwang Zhao