Re: In-placre persistance change of a relation - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: In-placre persistance change of a relation
Date
Msg-id 20210112.185808.926258436521829536.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: In-placre persistance change of a relation  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses Re: In-placre persistance change of a relation  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
List pgsql-hackers
At Fri, 08 Jan 2021 17:52:21 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> At Fri, 08 Jan 2021 14:47:05 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> > This version RelationChangePersistence() is changed not to choose
> > in-place method for indexes other than btree. It seems to be usable
> > with all kind of indexes other than Gist, but at the mement it applies
> > only to btrees.
> > 
> > 1: https://www.postgresql.org/message-id/CA+TgmoZEZ5RONS49C7mEpjhjndqMQtVrz_LCQUkpRWdmRevDnQ@mail.gmail.com
> 
> Hmm. This is not wroking correctly. I'll repost after fixint that.

I think I fixed the misbehavior. ResetUnloggedRelationsInDbspaceDir()
handles file operations in the wrong order and with the wrong logic.
It also needed to drop buffers and forget fsync requests.

I thought that the two cases that this patch is expected to fix
(orphan relation files and uncommited init files) can share the same
"cleanup" fork but that is wrong. I had to add one more additional
fork to differentiate the cases of SET UNLOGGED and of creation of
UNLOGGED tables...

The attached is a new version, that seems working correctly but looks
somewhat messy. I'll continue working.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 88e9374529cbd8f983f2c82baadea94b475e46dd Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v4 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.

Also this allows for the cleanup of files left behind in the crash of
the transaction that created it.
---
 src/backend/access/rmgrdesc/smgrdesc.c |  23 ++
 src/backend/access/transam/README      |   8 +
 src/backend/access/transam/xlog.c      |  17 +
 src/backend/catalog/storage.c          | 436 +++++++++++++++++++++++--
 src/backend/commands/tablecmds.c       | 246 +++++++++++---
 src/backend/storage/buffer/bufmgr.c    |  88 +++++
 src/backend/storage/file/reinit.c      | 322 ++++++++++++------
 src/backend/storage/smgr/md.c          |  13 +-
 src/backend/storage/smgr/smgr.c        |   6 +
 src/common/relpath.c                   |   4 +-
 src/include/catalog/storage.h          |   2 +
 src/include/catalog/storage_xlog.h     |  22 +-
 src/include/common/relpath.h           |   6 +-
 src/include/storage/bufmgr.h           |   2 +
 src/include/storage/md.h               |   2 +
 src/include/storage/reinit.h           |   3 +-
 src/include/storage/smgr.h             |   1 +
 17 files changed, 1034 insertions(+), 167 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index 7755553d57..2c109b8ca4 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,23 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
                          xlrec->blkno, xlrec->flags);
         pfree(path);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+        char       *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+        appendStringInfoString(buf, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+        char       *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+        appendStringInfoString(buf, path);
+        appendStringInfo(buf, " persistence %d", xlrec->persistence);
+        pfree(path);
+    }
 }
 
 const char *
@@ -55,6 +72,12 @@ smgr_identify(uint8 info)
         case XLOG_SMGR_TRUNCATE:
             id = "TRUNCATE";
             break;
+        case XLOG_SMGR_UNLINK:
+            id = "UNLINK";
+            break;
+        case XLOG_SMGR_BUFPERSISTENCE:
+            id = "BUFPERSISTENCE";
+            break;
     }
 
     return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 1edc8180c1..547107a771 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -724,6 +724,14 @@ we must panic and abort recovery.  The DBA will have to manually clean up
 then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
+The CLEANUP fork file
+--------------------------------
+
+An CLEANUP fork is created when a new relation file is created to mark
+the relfilenode needs to be cleaned up at recovery time.  In contrast
+to 4 above, failure to remove an CLEANUP fork file will lead to data
+loss, in which case the server will shut down.
+
 
 Skipping WAL for New RelFileNode
 --------------------------------
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b18257c198..6dcbcbe387 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -40,6 +40,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
+#include "catalog/storage.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
 #include "common/controldata_utils.h"
@@ -4442,6 +4443,14 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
             {
                 ereport(DEBUG1,
                         (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+                /* cleanup garbage files left during crash recovery */
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+                /* run rollback cleanup if any */
+                smgrDoPendingDeletes(false);
+
                 InArchiveRecovery = true;
                 if (StandbyModeRequested)
                     StandbyMode = true;
@@ -7455,6 +7464,14 @@ StartupXLOG(void)
                 }
             }
 
+            /* cleanup garbage files left during crash recovery */
+            if (!InArchiveRecovery)
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+            /* run rollback cleanup if any */
+            smgrDoPendingDeletes(false);
+
             /* Allow resource managers to do any required cleanup. */
             for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
             {
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index cba7a9ada0..c54d70747f 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -27,6 +28,7 @@
 #include "access/xlogutils.h"
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
+#include "common/hashfn.h"
 #include "miscadmin.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
@@ -57,9 +59,16 @@ int            wal_skip_threshold = 2048;    /* in kilobytes */
  * but I'm being paranoid.
  */
 
+#define    PDOP_DELETE                (0)
+#define    PDOP_UNLINK_FORK        (1 << 0)
+#define    PDOP_SET_PERSISTENCE    (1 << 1)
+
 typedef struct PendingRelDelete
 {
     RelFileNode relnode;        /* relation that may need to be deleted */
+    int            op;                /* operation mask */
+    bool        bufpersistence;    /* buffer persistence to set */
+    int            unlink_forknum;    /* forknum to unlink */
     BackendId    backend;        /* InvalidBackendId if not a temp rel */
     bool        atCommit;        /* T=delete at commit; F=delete at abort */
     int            nestLevel;        /* xact nesting level of request */
@@ -75,6 +84,24 @@ typedef struct PendingRelSync
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 HTAB       *pendingSyncHash = NULL;
 
+typedef struct SRelHashEntry
+{
+    SMgrRelation  srel;
+    char          status;    /* for simplehash use */
+} SRelHashEntry;
+
+/* define hashtable for workarea for pending deletes */
+#define SH_PREFIX srelhash
+#define SH_ELEMENT_TYPE SRelHashEntry
+#define SH_KEY_TYPE SMgrRelation
+#define SH_KEY srel
+#define SH_HASH_KEY(tb, key) \
+    hash_bytes((unsigned char *)&key, sizeof(SMgrRelation))
+#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(SMgrRelation)) == 0)
+#define SH_SCOPE static inline
+#define SH_DEFINE
+#define SH_DECLARE
+#include "lib/simplehash.h"
 
 /*
  * AddPendingSync
@@ -143,7 +170,17 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
             return NULL;        /* placate compiler */
     }
 
+    /*
+     * We are going to create a new storage file. If server crashes before the
+     * current transaction ends the file needs to be cleaned up but there's no
+     * clue to the orphan files. The cleanup fork works as the sentinel to
+     * identify that situation.
+     */
     srel = smgropen(rnode, backend);
+    smgrcreate(srel, CLEANUP2_FORKNUM, false);
+    log_smgrcreate(&rnode, CLEANUP2_FORKNUM);
+    smgrimmedsync(srel, CLEANUP2_FORKNUM);
+
     smgrcreate(srel, MAIN_FORKNUM, false);
 
     if (needs_wal)
@@ -153,12 +190,25 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rnode;
+    pending->op = PDOP_DELETE;
     pending->backend = backend;
     pending->atCommit = false;    /* delete if abort */
     pending->nestLevel = GetCurrentTransactionNestLevel();
     pending->next = pendingDeletes;
     pendingDeletes = pending;
 
+    /* drop cleanup fork at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = CLEANUP2_FORKNUM;
+    pending->backend = backend;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
     if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
     {
         Assert(backend == InvalidBackendId);
@@ -168,6 +218,218 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *        Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    SMgrRelation srel;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              create = true;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(rel->rd_smgr, false, false);
+
+    /*
+     * If we have entries for init-fork operation of this relation, that means
+     * that we have already registered pending delete entries to drop
+     * preexisting init fork since before the current transaction started. This
+     * function reverts that change just by removing the entries.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->op != PDOP_DELETE &&
+            ((pending->op & PDOP_UNLINK_FORK) != 0 &&
+             pending->unlink_forknum == CLEANUP_FORKNUM))
+        {
+            if (prev)
+                prev->next = next;
+            else
+                pendingDeletes = next;
+            pfree(pending);
+
+            create = false;
+        }
+        else
+        {
+            /* unrelated entry, don't touch it */
+            prev = pending;
+        }
+    }
+
+    if (!create)
+        return;
+
+    /*
+     * We are going to create the init fork. If server crashes before the
+     * current transaction ends the init fork left alone corrupts data while
+     * recovery.  The cleanup fork works as the sentinel to identify that
+     * situation.
+     */
+    srel = smgropen(rnode, InvalidBackendId);
+    smgrcreate(srel, CLEANUP_FORKNUM, false);
+    log_smgrcreate(&rnode, CLEANUP_FORKNUM);
+    smgrimmedsync(srel, CLEANUP_FORKNUM);
+
+    /* We don't have existing init fork, create it. */
+    smgrcreate(srel, INIT_FORKNUM, false);
+
+    /*
+     * index-init fork needs further initialization. ambuildempty shoud do
+     * WAL-log and file sync by itself but otherwise we do that by myself.
+     */
+    if (rel->rd_rel->relkind == RELKIND_INDEX)
+        rel->rd_indam->ambuildempty(rel);
+    else
+    {
+        log_smgrcreate(&rnode, INIT_FORKNUM);
+        smgrimmedsync(srel, INIT_FORKNUM);
+    }
+
+    /* drop this init fork file at abort and revert persistence */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK | PDOP_SET_PERSISTENCE;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->bufpersistence = true;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* drop cleanup fork at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = CLEANUP_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* drop cleanup fork at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = CLEANUP_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *        Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion doesn't happen.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              inxact_created = false;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(rel->rd_smgr, true, false);
+
+    /*
+     * If we have entries for init-fork operation of this relation, that means
+     * that we have created the init fork in the current transaction.  We
+     * immediately remove the init and cleanup forks immediately in that case.
+     * Otherwise just reister pending-delete for the existing init fork.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->op != PDOP_DELETE &&
+            ((pending->op & PDOP_UNLINK_FORK) != 0 &&
+             pending->unlink_forknum == CLEANUP_FORKNUM))
+        {
+            /* unlink list entry */
+            if (prev)
+                prev->next = next;
+            else
+                pendingDeletes = next;
+            pfree(pending);
+
+            inxact_created = true;
+        }
+        else
+        {
+            /* unrelated entry, don't touch it */
+            prev = pending;
+        }
+    }
+
+    if (inxact_created)
+    {
+        SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+
+        /*
+         * INIT/CLEANUP forks never be loaded to shared buffer so no point in
+         * dropping buffers for these files.
+         */
+        log_smgrunlink(&rnode, INIT_FORKNUM);
+        smgrunlink(srel, INIT_FORKNUM, false);
+        log_smgrunlink(&rnode, CLEANUP_FORKNUM);
+        smgrunlink(srel, CLEANUP_FORKNUM, false);
+        return;
+    }
+
+    /* register drop of this init fork file at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* revert buffer-persistence changes at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_SET_PERSISTENCE;
+    pending->bufpersistence = false;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +449,44 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
     XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+    xl_smgr_unlink xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file unlink.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+    xl_smgr_bufpersistence xlrec;
+
+    /*
+     * Make an XLOG entry reporting the change of buffer persistence.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.persistence = persistence;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *        Schedule unlinking of physical storage at transaction commit.
@@ -200,6 +500,7 @@ RelationDropStorage(Relation rel)
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rel->rd_node;
+    pending->op = PDOP_DELETE;
     pending->backend = rel->rd_backend;
     pending->atCommit = true;    /* delete if commit */
     pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -602,59 +903,97 @@ smgrDoPendingDeletes(bool isCommit)
     int            nrels = 0,
                 maxrels = 0;
     SMgrRelation *srels = NULL;
+    srelhash_hash  *close_srels = NULL;
+    bool            found;
 
     prev = NULL;
     for (pending = pendingDeletes; pending != NULL; pending = next)
     {
+        SMgrRelation srel;
+
         next = pending->next;
         if (pending->nestLevel < nestLevel)
         {
             /* outer-level entries should not be processed yet */
             prev = pending;
+            continue;
         }
+
+        /* unlink list entry first, so we don't retry on failure */
+        if (prev)
+            prev->next = next;
         else
+            pendingDeletes = next;
+
+        if (pending->atCommit != isCommit)
         {
-            /* unlink list entry first, so we don't retry on failure */
-            if (prev)
-                prev->next = next;
-            else
-                pendingDeletes = next;
-            /* do deletion if called for */
-            if (pending->atCommit == isCommit)
-            {
-                SMgrRelation srel;
-
-                srel = smgropen(pending->relnode, pending->backend);
-
-                /* allocate the initial array, or extend it, if needed */
-                if (maxrels == 0)
-                {
-                    maxrels = 8;
-                    srels = palloc(sizeof(SMgrRelation) * maxrels);
-                }
-                else if (maxrels <= nrels)
-                {
-                    maxrels *= 2;
-                    srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
-                }
-
-                srels[nrels++] = srel;
-            }
             /* must explicitly free the list entry */
             pfree(pending);
             /* prev does not change */
+            continue;
+        }
+
+        if (close_srels == NULL)
+            close_srels = srelhash_create(CurrentMemoryContext, 32, NULL);
+
+        srel = smgropen(pending->relnode, pending->backend);
+
+        /* Uniquify the smgr relations */
+        srelhash_insert(close_srels, srel, &found);
+
+        if (pending->op != PDOP_DELETE)
+        {
+            if (pending->op & PDOP_UNLINK_FORK)
+            {
+                /* other forks needs to drop buffers */
+                Assert(pending->unlink_forknum == INIT_FORKNUM ||
+                       pending->unlink_forknum == CLEANUP_FORKNUM ||
+                       pending->unlink_forknum == CLEANUP2_FORKNUM);
+
+                log_smgrunlink(&pending->relnode, pending->unlink_forknum);
+                smgrunlink(srel, pending->unlink_forknum, false);
+
+            }
+
+            if (pending->op & PDOP_SET_PERSISTENCE)
+                SetRelationBuffersPersistence(srel, pending->bufpersistence,
+                                              InRecovery);
+        }
+        else
+        {
+            /* allocate the initial array, or extend it, if needed */
+            if (maxrels == 0)
+            {
+                maxrels = 8;
+                srels = palloc(sizeof(SMgrRelation) * maxrels);
+            }
+            else if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            srels[nrels++] = srel;
         }
     }
 
     if (nrels > 0)
     {
         smgrdounlinkall(srels, nrels, false);
-
-        for (int i = 0; i < nrels; i++)
-            smgrclose(srels[i]);
-
         pfree(srels);
     }
+
+    if (close_srels)
+    {
+        srelhash_iterator i;
+        SRelHashEntry     *ent;
+
+        /* close smgr relatoins */
+        srelhash_start_iterate(close_srels, &i);
+        while ((ent = srelhash_iterate(close_srels, &i)) != NULL)
+            smgrclose(ent->srel);
+        srelhash_destroy(close_srels);
+    }
 }
 
 /*
@@ -824,7 +1163,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId
+            && pending->op == PDOP_DELETE)
             nrels++;
     }
     if (nrels == 0)
@@ -837,7 +1177,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId &&
+            pending->op == PDOP_DELETE)
         {
             *rptr = pending->relnode;
             rptr++;
@@ -917,6 +1258,15 @@ smgr_redo(XLogReaderState *record)
         reln = smgropen(xlrec->rnode, InvalidBackendId);
         smgrcreate(reln, xlrec->forkNum, true);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+        SMgrRelation reln;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        smgrunlink(reln, xlrec->forkNum, true);
+        smgrclose(reln);
+    }
     else if (info == XLOG_SMGR_TRUNCATE)
     {
         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1005,6 +1355,28 @@ smgr_redo(XLogReaderState *record)
 
         FreeFakeRelcacheEntry(rel);
     }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec =
+            (xl_smgr_bufpersistence *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingRelDelete *pending;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+        /* revert buffer-persistence changes at abort */
+        pending = (PendingRelDelete *)
+            MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+        pending->relnode = xlrec->rnode;
+        pending->op = PDOP_SET_PERSISTENCE;
+        pending->bufpersistence = !xlrec->persistence;
+        pending->backend = InvalidBackendId;
+        pending->atCommit = false;
+        pending->nestLevel = GetCurrentTransactionNestLevel();
+        pending->next = pendingDeletes;
+        pendingDeletes = pending;
+    }
     else
         elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 993da56d43..37a15d31ee 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -50,6 +50,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -4917,6 +4918,170 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
     return newcmd;
 }
 
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+                          LOCKMODE lockmode)
+{
+    Relation     rel;
+    Relation    classRel;
+    HeapTuple    tuple,
+                newtuple;
+    Datum        new_val[Natts_pg_class];
+    bool        new_null[Natts_pg_class],
+                new_repl[Natts_pg_class];
+    int            i;
+    List       *relids;
+    ListCell   *lc_oid;
+
+    Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+    Assert(lockmode == AccessExclusiveLock);
+
+    /*
+     * Under the following condition, we need to call ATRewriteTable, which
+     * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+     */
+    Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+           tab->newvals == NULL && !tab->verify_new_notnull);
+
+    rel = table_open(tab->relid, lockmode);
+
+    Assert(rel->rd_rel->relpersistence != persistence);
+
+    elog(DEBUG1, "perform im-place persistnce change");
+
+    RelationOpenSmgr(rel);
+
+    /*
+     * First we collect all relations that we need to change persistence.
+     */
+
+    /* Collect OIDs of indexes and toast relations */
+    relids = RelationGetIndexList(rel);
+    relids = lcons_oid(rel->rd_id, relids);
+
+    /* Add toast relation if any */
+    if (OidIsValid(rel->rd_rel->reltoastrelid))
+    {
+        List    *toastidx;
+        Relation toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+        RelationOpenSmgr(toastrel);
+        relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+        toastidx = RelationGetIndexList(toastrel);
+        relids = list_concat(relids, toastidx);
+        pfree(toastidx);
+        table_close(toastrel, NoLock);
+    }
+
+    table_close(rel, NoLock);
+
+    /* Make changes in storage */
+    classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+    foreach (lc_oid, relids)
+    {
+        Oid reloid = lfirst_oid(lc_oid);
+        Relation r = relation_open(reloid, lockmode);
+        
+        /* 
+         * Some access methods do not accept in-place persistence change. For
+         * example, GiST uses page LSNs to figure out whether a block has
+         * changed, where UNLOGGED GiST indexes use fake LSNs that are
+         * incompatible with real LSNs used for LOGGED ones.
+         *
+         * XXXX: We don't bother allowing in-place persistence change for index
+         * methods other than btree for now.
+         */
+        if (r->rd_rel->relkind == RELKIND_INDEX &&
+            r->rd_rel->relam != BTREE_AM_OID)
+        {
+            int            reindex_flags;
+
+            /* reindex doesn't allow concurrent use of the index */
+            table_close(r, NoLock);
+
+            reindex_flags =
+                REINDEX_REL_SUPPRESS_INDEX_USE |
+                REINDEX_REL_CHECK_CONSTRAINTS;
+
+            /* Set the same persistence with the parent relation. */
+            if (persistence == RELPERSISTENCE_UNLOGGED)
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+            else
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+            reindex_index(reloid, reindex_flags, persistence, 0);
+
+            continue;
+        }
+
+        RelationOpenSmgr(r);
+
+        /* Create or drop init fork */
+        if (persistence == RELPERSISTENCE_UNLOGGED)
+            RelationCreateInitFork(r);
+        else
+            RelationDropInitFork(r);
+
+        /*
+         * When this relation gets WAL-logged, immediately sync all files but
+         * initfork to establish the initial state on storage.  Buffers have
+         * already flushed out by RelationCreate(Drop)InitFork called just
+         * above. Initfork should have been synced as needed.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT)
+        {
+            for (i = 0 ; i < INIT_FORKNUM ; i++)
+            {
+                if (smgrexists(r->rd_smgr, i))
+                    smgrimmedsync(r->rd_smgr, i);
+            }
+        }
+
+        /* Update catalog */
+        tuple = SearchSysCacheCopy1(RELOID,    ObjectIdGetDatum(reloid));
+        if (!HeapTupleIsValid(tuple))
+            elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+        memset(new_val, 0, sizeof(new_val));
+        memset(new_null, false, sizeof(new_null));
+        memset(new_repl, false, sizeof(new_repl));
+
+        new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+        new_null[Anum_pg_class_relpersistence - 1] = false;
+        new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+        newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+                                     new_val, new_null, new_repl);
+
+        CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+        heap_freetuple(newtuple);
+
+        /*
+         * While wal_level >= replica, switching to LOGGED requires the
+         * relation content to be WAL-logged to recover the table.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+        {
+            ForkNumber fork;
+
+            for (fork = 0; fork < INIT_FORKNUM ; fork++)
+            {
+                if (smgrexists(r->rd_smgr, fork))
+                    log_newpage_range(r, fork,
+                                      0, smgrnblocks(r->rd_smgr, fork), false);
+            }
+        }
+
+        table_close(r, NoLock);
+    }
+
+    table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5037,45 +5202,52 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
                                          tab->relid,
                                          tab->rewrite);
 
-            /*
-             * Create transient table that will receive the modified data.
-             *
-             * Ensure it is marked correctly as logged or unlogged.  We have
-             * to do this here so that buffers for the new relfilenode will
-             * have the right persistence set, and at the same time ensure
-             * that the original filenode's buffers will get read in with the
-             * correct setting (i.e. the original one).  Otherwise a rollback
-             * after the rewrite would possibly result with buffers for the
-             * original filenode having the wrong persistence setting.
-             *
-             * NB: This relies on swap_relation_files() also swapping the
-             * persistence. That wouldn't work for pg_class, but that can't be
-             * unlogged anyway.
-             */
-            OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
-                                       lockmode);
+            if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+                RelationChangePersistence(tab, persistence, lockmode);
+            else
+            {
+                /*
+                 * Create transient table that will receive the modified data.
+                 *
+                 * Ensure it is marked correctly as logged or unlogged.  We
+                 * have to do this here so that buffers for the new relfilenode
+                 * will have the right persistence set, and at the same time
+                 * ensure that the original filenode's buffers will get read in
+                 * with the correct setting (i.e. the original one).  Otherwise
+                 * a rollback after the rewrite would possibly result with
+                 * buffers for the original filenode having the wrong
+                 * persistence setting.
+                 *
+                 * NB: This relies on swap_relation_files() also swapping the
+                 * persistence. That wouldn't work for pg_class, but that can't
+                 * be unlogged anyway.
+                 */
+                OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
+                                           lockmode);
 
-            /*
-             * Copy the heap data into the new table with the desired
-             * modifications, and test the current data within the table
-             * against new constraints generated by ALTER TABLE commands.
-             */
-            ATRewriteTable(tab, OIDNewHeap, lockmode);
+                /*
+                 * Copy the heap data into the new table with the desired
+                 * modifications, and test the current data within the table
+                 * against new constraints generated by ALTER TABLE commands.
+                 */
+                ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-            /*
-             * Swap the physical files of the old and new heaps, then rebuild
-             * indexes and discard the old heap.  We can use RecentXmin for
-             * the table's new relfrozenxid because we rewrote all the tuples
-             * in ATRewriteTable, so no older Xid remains in the table.  Also,
-             * we never try to swap toast tables by content, since we have no
-             * interest in letting this code work on system catalogs.
-             */
-            finish_heap_swap(tab->relid, OIDNewHeap,
-                             false, false, true,
-                             !OidIsValid(tab->newTableSpace),
-                             RecentXmin,
-                             ReadNextMultiXactId(),
-                             persistence);
+                /*
+                 * Swap the physical files of the old and new heaps, then
+                 * rebuild indexes and discard the old heap.  We can use
+                 * RecentXmin for the table's new relfrozenxid because we
+                 * rewrote all the tuples in ATRewriteTable, so no older Xid
+                 * remains in the table.  Also, we never try to swap toast
+                 * tables by content, since we have no interest in letting this
+                 * code work on system catalogs.
+                 */
+                finish_heap_swap(tab->relid, OIDNewHeap,
+                                 false, false, true,
+                                 !OidIsValid(tab->newTableSpace),
+                                 RecentXmin,
+                                 ReadNextMultiXactId(),
+                                 persistence);
+            }
         }
         else
         {
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 71b5852224..b730b4417c 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -3032,6 +3033,93 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
     }
 }
 
+/* ---------------------------------------------------------------------
+ *        SetRelFileNodeBuffersPersistence
+ *
+ *        This function changes the persistence of all buffer pages of a relation
+ *        then writes all dirty pages of the relation out to disk when switching
+ *        to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *        ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *        Generally, the caller should be holding AccessExclusiveLock on the
+ *        target relation to ensure that no other backend is busy dirtying
+ *        more blocks of the relation; the effects can't be expected to last
+ *        after the lock is released.
+ *
+ *        XXX currently it sequentially searches the buffer pool, should be
+ *        changed to more clever ways of searching.  This routine is not
+ *        used in any performance-critical code paths, so it's not worth
+ *        adding additional overhead to normal paths to make it go faster;
+ *        but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+    int            i;
+    RelFileNodeBackend rnode = srel->smgr_rnode;
+
+    Assert (!RelFileNodeBackendIsTemp(rnode));
+
+    if (!isRedo)
+        log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+    for (i = 0; i < NBuffers; i++)
+    {
+        BufferDesc *bufHdr = GetBufferDescriptor(i);
+        uint32        buf_state;
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+            continue;
+
+        ReservePrivateRefCountEntry();
+
+        buf_state = LockBufHdr(bufHdr);
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+        {
+            UnlockBufHdr(bufHdr, buf_state);
+            continue;
+        }
+
+        if (permanent)
+        {
+            /* Init fork is being dropped, drop buffers for it. */
+            if (bufHdr->tag.forkNum == INIT_FORKNUM)
+            {
+                InvalidateBuffer(bufHdr);
+                continue;
+            }
+
+            buf_state |= BM_PERMANENT;
+            pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+            /* we flush this buffer when switching to PERMANENT */
+            if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+            {
+                PinBuffer_Locked(bufHdr);
+                LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                              LW_SHARED);
+                FlushBuffer(bufHdr, srel);
+                LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+                UnpinBuffer(bufHdr, true);
+            }
+            else
+                UnlockBufHdr(bufHdr, buf_state);
+        }
+        else
+        {
+            /* init fork is always BM_PERMANENT. See BufferAlloc */
+            if (bufHdr->tag.forkNum != INIT_FORKNUM)
+                buf_state &= ~BM_PERMANENT;
+
+            UnlockBufHdr(bufHdr, buf_state);
+        }
+    }
+}
+
 /* ---------------------------------------------------------------------
  *        DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index 40c758d789..b07709bc4f 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,50 @@
 
 #include <unistd.h>
 
+#include "access/xlog.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-                                                  int op);
+                                                  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-                                               int op);
+                                               Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
     Oid            reloid;            /* hash key */
-} unlogged_relation_entry;
+    bool        has_init;        /* has INIT fork */
+    bool        dirty_init;        /* needs to remove INIT fork */
+    bool        dirty_all;        /* needs to remove all forks */
+} relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of the "cleanup" forks.
  *
+ * If CLEANUP_FORKNUM (clup) is present, we remove the init fork of the same
+ * relation along with the clup fork.
+ *
+ * If CLEANUP2_FORKNUM (cln2) is present we remove the whole relation along
+ * with the cln2 fork.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that have the "cleanup" and/or the "init" forks.
+ * 
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
  */
@@ -68,7 +89,7 @@ ResetUnloggedRelations(int op)
     /*
      * First process unlogged files in pg_default ($PGDATA/base)
      */
-    ResetUnloggedRelationsInTablespaceDir("base", op);
+    ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
     /*
      * Cycle through directories for all non-default tablespaces.
@@ -77,13 +98,19 @@ ResetUnloggedRelations(int op)
 
     while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
     {
+        Oid tspid;
+
         if (strcmp(spc_de->d_name, ".") == 0 ||
             strcmp(spc_de->d_name, "..") == 0)
             continue;
 
         snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
                  spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-        ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+        tspid = atooid(spc_de->d_name);
+        Assert(tspid != 0);
+
+        ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
     }
 
     FreeDir(spc_dir);
@@ -99,7 +126,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+                                      Oid tspid, int op)
 {
     DIR           *ts_dir;
     struct dirent *de;
@@ -126,6 +154,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
     while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
     {
+        Oid dbid;
+
         /*
          * We're only interested in the per-database directories, which have
          * numeric names.  Note that this code will also (properly) ignore "."
@@ -136,7 +166,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
         snprintf(dbspace_path, sizeof(dbspace_path), "%s/%s",
                  tsdirname, de->d_name);
-        ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+        dbid = atooid(de->d_name);
+        Assert(dbid != 0);
+
+        ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
     }
 
     FreeDir(ts_dir);
@@ -146,125 +179,232 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+                                   Oid tspid, Oid dbid, int op)
 {
     DIR           *dbspace_dir;
     struct dirent *de;
     char        rm_path[MAXPGPATH * 2];
+    HTAB       *hash;
+    HASHCTL        ctl;
 
     /* Caller must specify at least one operation. */
-    Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+    Assert((op & (UNLOGGED_RELATION_CLEANUP |
+                  UNLOGGED_RELATION_DROP_BUFFER |
+                  UNLOGGED_RELATION_INIT)) != 0);
 
     /*
      * Cleanup is a two-pass operation.  First, we go through and identify all
      * the files with init forks.  Then, we go through again and nuke
      * everything with the same OID except the init fork.
      */
+
+    /*
+     * It's possible that someone could create a ton of unlogged relations
+     * in the same database & tablespace, so we'd better use a hash table
+     * rather than an array or linked list to keep track of which files
+     * need to be reset.  Otherwise, this cleanup operation would be
+     * O(n^2).
+     */
+    memset(&ctl, 0, sizeof(ctl));
+    ctl.keysize = sizeof(Oid);
+    ctl.entrysize = sizeof(relfile_entry);
+    hash = hash_create("relfilenode cleanup hash",
+                       32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+    /* Collect INIT and CLEANUP forks in the directory. */
+    dbspace_dir = AllocateDir(dbspacedirname);
+    while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+    {
+        int            oidchars;
+        ForkNumber    forkNum;
+
+        /* Skip anything that doesn't look like a relation data file. */
+        if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+                                                 &forkNum))
+            continue;
+
+        if (forkNum == INIT_FORKNUM ||
+            forkNum == CLEANUP_FORKNUM || forkNum == CLEANUP2_FORKNUM)
+        {
+            Oid                key;
+            relfile_entry  *ent;
+            bool            found;
+
+            /*
+             * Record the relfilenode information. If it has the CLEANUP fork,
+             * the relfilenode is in dirty state, where clean up is needed.
+             */
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+            if (!found)
+            {
+                ent->has_init = false;
+                ent->dirty_init = false;
+                ent->dirty_all = false;
+            }
+
+            if (forkNum == CLEANUP_FORKNUM)
+                ent->dirty_init = true;
+            else if (forkNum == CLEANUP2_FORKNUM)
+                ent->dirty_all = true;
+            else
+            {
+                Assert(forkNum == INIT_FORKNUM);
+                ent->has_init = true;
+            }
+        }
+    }
+
+    /* Done with the first pass. */
+    FreeDir(dbspace_dir);
+
+    /* nothing to do if we don't have init nor cleanup forks */
+    if (hash_get_num_entries(hash) < 1)
+    {
+        hash_destroy(hash);
+        return;
+    }
+
+    if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+    {
+        /*
+         * When we come here after recovery, smgr object for this file might
+         * have been created. In that case we need to drop all buffers then the
+         * smgr object before initializing the unlogged relation.  This is safe
+         * as far as no other backends have accessed the relation before
+         * starting archive recovery.
+         */
+        HASH_SEQ_STATUS status;
+        relfile_entry *ent;
+        SMgrRelation   *srels = palloc(sizeof(SMgrRelation) * 8);
+        int               maxrels = 8;
+        int               nrels = 0;
+        RelFileNodeBackend *rnodes;
+        int i;
+
+        Assert(!HotStandbyActive());
+
+        hash_seq_init(&status, hash);
+        while((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
+        {
+            RelFileNodeBackend rel;
+
+            /*
+             * The relation is persistent and stays remain persistent. Don't
+             * drop the buffers for this relation.
+             */
+            if (ent->has_init && ent->dirty_init)
+                continue;
+
+            if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = ent->reloid;
+
+            srels[nrels++] = smgropen(rel.node, InvalidBackendId);
+        }
+
+        rnodes = palloc(sizeof(RelFileNodeBackend) * nrels);
+
+        for (i = 0 ; i < nrels ; i++)
+            rnodes[i] = srels[i]->smgr_rnode;
+
+        DropRelFileNodesAllBuffers(rnodes, nrels);
+
+        for (i = 0 ; i < nrels ; i++)
+            smgrclose(srels[i]);
+    }
+
+    /*
+     * Now, make a second pass and remove anything that matches.
+     */
     if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
     {
-        HTAB       *hash;
-        HASHCTL        ctl;
-
-        /*
-         * It's possible that someone could create a ton of unlogged relations
-         * in the same database & tablespace, so we'd better use a hash table
-         * rather than an array or linked list to keep track of which files
-         * need to be reset.  Otherwise, this cleanup operation would be
-         * O(n^2).
-         */
-        ctl.keysize = sizeof(Oid);
-        ctl.entrysize = sizeof(unlogged_relation_entry);
-        ctl.hcxt = CurrentMemoryContext;
-        hash = hash_create("unlogged relation OIDs", 32, &ctl,
-                           HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-        /* Scan the directory. */
-        dbspace_dir = AllocateDir(dbspacedirname);
-        while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-        {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
-
-            /* Skip anything that doesn't look like a relation data file. */
-            if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* Also skip it unless this is the init fork. */
-            if (forkNum != INIT_FORKNUM)
-                continue;
-
-            /*
-             * Put the OID portion of the name into the hash table, if it
-             * isn't already.
-             */
-            ent.reloid = atooid(de->d_name);
-            (void) hash_search(hash, &ent, HASH_ENTER, NULL);
-        }
-
-        /* Done with the first pass. */
-        FreeDir(dbspace_dir);
-
-        /*
-         * If we didn't find any init forks, there's no point in continuing;
-         * we can bail out now.
-         */
-        if (hash_get_num_entries(hash) == 0)
-        {
-            hash_destroy(hash);
-            return;
-        }
-
-        /*
-         * Now, make a second pass and remove anything that matches.
-         */
         dbspace_dir = AllocateDir(dbspacedirname);
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
+            ForkNumber        forkNum;
+            int                oidchars;
+            Oid                key;
+            relfile_entry  *ent;
+            RelFileNodeBackend rel;
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
                                                      &forkNum))
                 continue;
 
-            /* We never remove the init fork. */
-            if (forkNum == INIT_FORKNUM)
-                continue;
-
             /*
              * See whether the OID portion of the name shows up in the hash
              * table.  If so, nuke it!
              */
-            ent.reloid = atooid(de->d_name);
-            if (hash_search(hash, &ent, HASH_FIND, NULL))
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+            if (!ent)
+                continue;
+
+            if (!ent->dirty_all)
             {
-                snprintf(rm_path, sizeof(rm_path), "%s/%s",
-                         dbspacedirname, de->d_name);
-                if (unlink(rm_path) < 0)
-                    ereport(ERROR,
-                            (errcode_for_file_access(),
-                             errmsg("could not remove file \"%s\": %m",
-                                    rm_path)));
+                /* clean permanent relations don't need cleanup */
+                if (!ent->has_init)
+                    continue;
+
+                if (ent->dirty_init)
+                {
+                    /*
+                     * The crashed trasaction did SET UNLOGGED. This relation
+                     * is restored to a LOGGED relation.
+                     */
+                    if (forkNum != INIT_FORKNUM && forkNum != CLEANUP_FORKNUM)
+                        continue;
+                }
                 else
-                    elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+                {
+                    /*
+                     * we don't remove the INIT fork of a non-dirty
+                     * relfilenode
+                     */
+                    if (forkNum == INIT_FORKNUM)
+                        continue;
+                }
             }
+
+            /* so, nuke it! */
+            snprintf(rm_path, sizeof(rm_path), "%s/%s",
+                     dbspacedirname, de->d_name);
+            if (unlink(rm_path) < 0)
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m",
+                                rm_path)));
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = atooid(de->d_name);
+
+            ForgetRelationForkSyncRequests(rel, forkNum);
         }
 
         /* Cleanup is complete. */
         FreeDir(dbspace_dir);
-        hash_destroy(hash);
     }
 
+    hash_destroy(hash);
+    hash = NULL;
+
     /*
      * Initialization happens after cleanup is complete: we copy each init
-     * fork file to the corresponding main fork file.  Note that if we are
-     * asked to do both cleanup and init, we may never get here: if the
-     * cleanup code determines that there are no init forks in this dbspace,
-     * it will return before we get to this point.
+     * fork file to the corresponding main fork file.
      */
     if ((op & UNLOGGED_RELATION_INIT) != 0)
     {
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 0643d714fb..6b37195c52 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -338,8 +338,10 @@ mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
         if (ret == 0 || errno != ENOENT)
         {
             ret = unlink(path);
+
+            /* failure of removing cleanup fork leads to a data loss. */
             if (ret < 0 && errno != ENOENT)
-                ereport(WARNING,
+                ereport((forkNum != CLEANUP_FORKNUM ? WARNING : ERROR),
                         (errcode_for_file_access(),
                          errmsg("could not remove file \"%s\": %m", path)));
         }
@@ -1024,6 +1026,15 @@ register_forget_request(RelFileNodeBackend rnode, ForkNumber forknum,
     RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileNodeBackend rnode, ForkNumber forknum)
+{
+    register_forget_request(rnode, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 0f31ff3822..4102d3d59c 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -644,6 +644,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
     smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 1f5c426ec0..479dcc248e 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -34,7 +34,9 @@ const char *const forkNames[] = {
     "main",                        /* MAIN_FORKNUM */
     "fsm",                        /* FSM_FORKNUM */
     "vm",                        /* VISIBILITYMAP_FORKNUM */
-    "init"                        /* INIT_FORKNUM */
+    "init",                        /* INIT_FORKNUM */
+    "clup",                        /* CLEANUP_FORKNUM */
+    "cln2"                        /* CLEANUP2_FORKNUM */
 };
 
 StaticAssertDecl(lengthof(forkNames) == (MAX_FORKNUM + 1),
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 0ab32b44e9..382623159c 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int    wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index f0814f1458..0fd0832a8b 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -22,13 +22,17 @@
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation, deletion and persistence change
+ * here. logging of deletion actions is mainly handled by xact.c, because it is
+ * part of transaction commit, but we log deletions happens outside of a
+ * transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE    0x10
 #define XLOG_SMGR_TRUNCATE    0x20
+#define XLOG_SMGR_UNLINK    0x30
+#define XLOG_SMGR_BUFPERSISTENCE    0x40
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +40,18 @@ typedef struct xl_smgr_create
     ForkNumber    forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+    RelFileNode rnode;
+    ForkNumber    forkNum;
+} xl_smgr_unlink;
+
+typedef struct xl_smgr_bufpersistence
+{
+    RelFileNode rnode;
+    bool        persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP        0x0001
 #define SMGR_TRUNCATE_VM        0x0002
@@ -51,6 +67,8 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a44be11ca0..040070aa2b 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -43,7 +43,9 @@ typedef enum ForkNumber
     MAIN_FORKNUM = 0,
     FSM_FORKNUM,
     VISIBILITYMAP_FORKNUM,
-    INIT_FORKNUM
+    INIT_FORKNUM,
+    CLEANUP_FORKNUM,
+    CLEANUP2_FORKNUM
 
     /*
      * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
@@ -52,7 +54,7 @@ typedef enum ForkNumber
      */
 } ForkNumber;
 
-#define MAX_FORKNUM        INIT_FORKNUM
+#define MAX_FORKNUM        CLEANUP2_FORKNUM
 
 #define FORKNAMECHARS    4        /* max chars for a fork name */
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ff6cd0fc54..d9752a8317 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -205,6 +205,8 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
                                    int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+                                          bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 752b440864..3cbbbf2edd 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -41,6 +41,8 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
                        BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileNodeBackend rnode,
+                                           ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
 
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index fad1e5c473..b969ba8e86 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -23,6 +23,7 @@ extern bool parse_filename_for_nontemp_relation(const char *name,
                                                 int *oidchars, ForkNumber *fork);
 
 #define UNLOGGED_RELATION_CLEANUP        0x0001
-#define UNLOGGED_RELATION_INIT            0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER    0x0002
+#define UNLOGGED_RELATION_INIT            0x0004
 
 #endif                            /* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index ebf4a199dc..8be17d9afc 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,6 +86,7 @@ extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-- 
2.27.0

From 70d300969fbd2aae6c66b36f6100d3d2516a0dab Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 23:21:09 +0900
Subject: [PATCH v4 2/2] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 src/backend/commands/tablecmds.c | 140 +++++++++++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c    |  16 ++++
 src/backend/nodes/equalfuncs.c   |  15 ++++
 src/backend/parser/gram.y        |  20 +++++
 src/backend/tcop/utility.c       |  11 +++
 src/include/commands/tablecmds.h |   2 +
 src/include/nodes/nodes.h        |   1 +
 src/include/nodes/parsenodes.h   |   9 ++
 8 files changed, 214 insertions(+)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 37a15d31ee..2f65abb19b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -13696,6 +13696,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
     return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistene only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt)
+{
+    List       *relations = NIL;
+    ListCell   *l;
+    ScanKeyData key[1];
+    Relation    rel;
+    TableScanDesc scan;
+    HeapTuple    tuple;
+    Oid            tablespaceoid;
+    List       *role_oids = roleSpecsToIds(NIL);
+
+    /* Ensure we were not asked to change something we can't */
+    if (stmt->objtype != OBJECT_TABLE)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only tables can be specified")));
+
+    /* Get the tablespace OID */
+    tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+    /*
+     * Now that the checks are done, check if we should set either to
+     * InvalidOid because it is our database's default tablespace.
+     */
+    if (tablespaceoid == MyDatabaseTableSpace)
+        tablespaceoid = InvalidOid;
+
+    /*
+     * Walk the list of objects in the tablespace to pick up them. This will
+     * only find objects in our database, of course.
+     */
+    ScanKeyInit(&key[0],
+                Anum_pg_class_reltablespace,
+                BTEqualStrategyNumber, F_OIDEQ,
+                ObjectIdGetDatum(tablespaceoid));
+
+    rel = table_open(RelationRelationId, AccessShareLock);
+    scan = table_beginscan_catalog(rel, 1, key);
+    while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+    {
+        Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+        Oid            relOid = relForm->oid;
+
+        /*
+         * Do not pick-up objects in pg_catalog as part of this, if an admin
+         * really wishes to do so, they can issue the individual ALTER
+         * commands directly.
+         *
+         * Also, explicitly avoid any shared tables, temp tables, or TOAST
+         * (TOAST will be changed with the main table).
+         */
+        if (IsCatalogNamespace(relForm->relnamespace) ||
+            relForm->relisshared ||
+            isAnyTempNamespace(relForm->relnamespace) ||
+            IsToastNamespace(relForm->relnamespace))
+            continue;
+
+        /* Only pick up the object type requested */
+        if (relForm->relkind != RELKIND_RELATION)
+            continue;
+
+        /* Check if we are only picking-up objects owned by certain roles */
+        if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+            continue;
+
+        /*
+         * Handle permissions-checking here since we are locking the tables
+         * and also to avoid doing a bunch of work only to fail part-way. Note
+         * that permissions will also be checked by AlterTableInternal().
+         *
+         * Caller must be considered an owner on the table of which we're going
+         * to change persistence.
+         */
+        if (!pg_class_ownercheck(relOid, GetUserId()))
+            aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+                           NameStr(relForm->relname));
+
+        if (stmt->nowait &&
+            !ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+            ereport(ERROR,
+                    (errcode(ERRCODE_OBJECT_IN_USE),
+                     errmsg("aborting because lock on relation \"%s.%s\" is not available",
+                            get_namespace_name(relForm->relnamespace),
+                            NameStr(relForm->relname))));
+        else
+            LockRelationOid(relOid, AccessExclusiveLock);
+
+        /*
+         * Add to our list of objects of which we're going to change
+         * persistence.
+         */
+        relations = lappend_oid(relations, relOid);
+    }
+
+    table_endscan(scan);
+    table_close(rel, AccessShareLock);
+
+    if (relations == NIL)
+        ereport(NOTICE,
+                (errcode(ERRCODE_NO_DATA_FOUND),
+                 errmsg("no matching relations in tablespace \"%s\" found",
+                        tablespaceoid == InvalidOid ? "(database default)" :
+                        get_tablespace_name(tablespaceoid))));
+
+    /*
+     * Everything is locked, loop through and change persistence of all of the
+     * relations.
+     */
+    foreach(l, relations)
+    {
+        List       *cmds = NIL;
+        AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+        if (stmt->logged)
+            cmd->subtype = AT_SetLogged;
+        else
+            cmd->subtype = AT_SetUnLogged;
+
+        cmds = lappend(cmds, cmd);
+
+        EventTriggerAlterTableStart((Node *) stmt);
+        /* OID is set by AlterTableInternal */
+        AlterTableInternal(lfirst_oid(l), cmds, false);
+        EventTriggerAlterTableEnd();
+    }
+}
+
 static void
 index_copy_data(Relation rel, RelFileNode newrnode)
 {
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index ba3ccc712c..127da5151d 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4138,6 +4138,19 @@ _copyAlterTableMoveAllStmt(const AlterTableMoveAllStmt *from)
     return newnode;
 }
 
+static AlterTableSetLoggedAllStmt *
+_copyAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *from)
+{
+    AlterTableSetLoggedAllStmt *newnode = makeNode(AlterTableSetLoggedAllStmt);
+
+    COPY_STRING_FIELD(tablespacename);
+    COPY_SCALAR_FIELD(objtype);
+    COPY_SCALAR_FIELD(logged);
+    COPY_SCALAR_FIELD(nowait);
+
+    return newnode;
+}
+
 static CreateExtensionStmt *
 _copyCreateExtensionStmt(const CreateExtensionStmt *from)
 {
@@ -5441,6 +5454,9 @@ copyObjectImpl(const void *from)
         case T_AlterTableMoveAllStmt:
             retval = _copyAlterTableMoveAllStmt(from);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _copyAlterTableSetLoggedAllStmt(from);
+            break;
         case T_CreateExtensionStmt:
             retval = _copyCreateExtensionStmt(from);
             break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index a2ef853dc2..4f13a1762b 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1872,6 +1872,18 @@ _equalAlterTableMoveAllStmt(const AlterTableMoveAllStmt *a,
     return true;
 }
 
+static bool
+_equalAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *a,
+                                 const AlterTableSetLoggedAllStmt *b)
+{
+    COMPARE_STRING_FIELD(tablespacename);
+    COMPARE_SCALAR_FIELD(objtype);
+    COMPARE_SCALAR_FIELD(logged);
+    COMPARE_SCALAR_FIELD(nowait);
+
+    return true;
+}
+
 static bool
 _equalCreateExtensionStmt(const CreateExtensionStmt *a, const CreateExtensionStmt *b)
 {
@@ -3494,6 +3506,9 @@ equal(const void *a, const void *b)
         case T_AlterTableMoveAllStmt:
             retval = _equalAlterTableMoveAllStmt(a, b);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _equalAlterTableSetLoggedAllStmt(a, b);
+            break;
         case T_CreateExtensionStmt:
             retval = _equalCreateExtensionStmt(a, b);
             break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 31c95443a5..2222fd8fe3 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -1934,6 +1934,26 @@ AlterTableStmt:
                     n->nowait = $13;
                     $$ = (Node *)n;
                 }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = true;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = false;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
         |    ALTER INDEX qualified_name alter_table_cmds
                 {
                     AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 53a511f1da..16606448bf 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -161,6 +161,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
         case T_AlterTSConfigurationStmt:
         case T_AlterTSDictionaryStmt:
         case T_AlterTableMoveAllStmt:
+        case T_AlterTableSetLoggedAllStmt:
         case T_AlterTableSpaceOptionsStmt:
         case T_AlterTableStmt:
         case T_AlterTypeStmt:
@@ -1732,6 +1733,12 @@ ProcessUtilitySlow(ParseState *pstate,
                 commandCollected = true;
                 break;
 
+            case T_AlterTableSetLoggedAllStmt:
+                AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+                /* commands are stashed in AlterTableSetLoggedAll */
+                commandCollected = true;
+                break;
+
             case T_DropStmt:
                 ExecDropStmt((DropStmt *) parsetree, isTopLevel);
                 /* no commands stashed for DROP */
@@ -2619,6 +2626,10 @@ CreateCommandTag(Node *parsetree)
             tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
             break;
 
+        case T_AlterTableSetLoggedAllStmt:
+            tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+            break;
+
         case T_AlterTableStmt:
             tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
             break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 08c463d3c4..646928466d 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid    AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
                                          Oid *oldschema);
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index caed683ba9..16d91d3e1d 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -424,6 +424,7 @@ typedef enum NodeTag
     T_AlterCollationStmt,
     T_CallStmt,
     T_AlterStatsStmt,
+    T_AlterTableSetLoggedAllStmt,
 
     /*
      * TAGS FOR PARSE TREE NODES (parsenodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index dc2bb40926..c3eab6f1ab 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2253,6 +2253,15 @@ typedef struct AlterTableMoveAllStmt
     bool        nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+    NodeTag        type;
+    char       *tablespacename;
+    ObjectType    objtype;        /* Object type to move */
+    bool        logged;
+    bool        nowait;
+} AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *        Create/Alter Extension Statements
  * ----------------------
-- 
2.27.0


pgsql-hackers by date:

Previous
From: Ajin Cherian
Date:
Subject: Re: Track replica origin progress for Rollback Prepared
Next
From: Yugo NAGATA
Date:
Subject: Re: Implementing Incremental View Maintenance