Re: In-placre persistance change of a relation - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: In-placre persistance change of a relation
Date
Msg-id 20201113.132201.148696021363404735.horikyota.ntt@gmail.com
Whole thread Raw
In response to In-placre persistance change of a relation  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses RE: In-placre persistance change of a relation
List pgsql-hackers
Hello.  Before posting the next version, I'd like to explain what this
patch is.

1. The Issue

Bulk data loading is a long-time taking, I/O consuming task.  Many
DBAs want that task is faster, even at the cost of increasing risk of
data-loss.  wal_level=minimal is an answer to such a
request. Data-loading onto a table that is created in the current
transaction omits WAL-logging and synced at commit.

However, the optimization doesn't benefit the case where the
data-loading is performed onto existing tables. There are quite a few
cases where data is loaded into tables that already contains a lot of
data. Those cases don't take benefit of the optimization.

Another possible solution for bulk data-loading is UNLOGGED
tables. But when we switch LOGGED/UNLOGGED of a table, all the table
content is copied to a newly created heap, which is costly.


2. Proposed Solutions.

There are two proposed solutions are discussed on this mailing
list. One is wal_level = none (*1), which omits WAL-logging almost at
all. Another is extending the existing optimization to the ALTER TABLE
SET LOGGED/UNLOGGED cases, which is to be discussed in this new
thread.


3. In-place Persistence Change

So the attached is a PoC patch of the "another" solution.  When we
want to change table persistence in-place, basically we need to do the
following steps.

(the talbe is exclusively locked)

(1) Flip BM_PERMANENT flag of all shared buffer blocks for the heap.

(2) Create or delete the init fork for existing heap.

(3) Flush all buffers of the relation to file system.

(4) Sync heap files.

(5) Make catalog changes.


4. Transactionality

The 1, 2 and 5 above need to be abort-able. 5 is rolled back by
existing infrastructure, and rolling-back of 1 and 2 are achieved by
piggybacking on the pendingDeletes mechanism.


5. Replication

Furthermore, that changes ought to be replicable to standbys.  Catalog
changes are replicated as usual. 

On-the-fly creation of the init fork leads to recovery mess. Even
though it is removed at abort, if the server crashed before
transaction end, the file is left alone and corrupts database in the
next recovery. I sought a way to create the init fork in
smgrPendingDelete but that needs relcache and relcache is not
available at that late of commit. Finally, I introduced the fifth fork
kind "INITTMP"(_itmp) only to signal that the init file is not
committed. I don't like that way but it seems working fine...


6. SQL Command

The second file in the patchset adds a syntax that changes persistence
of all tables in a tablespace.

ALTER TABLE ALL IN TABLESPACE <tsp> SET LOGGED/UNLOGGED [ NOWAIT ];


7. Testing

I tried to write TAP test for this, but IPC::Run::harness (or
interactive_psql) doesn't seem to work for me.  I'm not sure what
exactly is happening but pty redirection doesn't work.

  $in = "ls\n"; $out = ""; run ["/usr/bin/bash"], \$in, \$out; print $out;

works but

  $in = "ls\n"; $out = ""; run ["/usr/bin/bash"], '<pty<', \$in, '>pty>', \$out; print $out;

doesn't respond.


The patch is attached.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 05d1971d0f4f0f42899f5d6857892128487eeb40 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v3 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.
---
 src/backend/access/rmgrdesc/smgrdesc.c |  23 ++
 src/backend/catalog/storage.c          | 355 +++++++++++++++++++++++--
 src/backend/commands/tablecmds.c       | 217 ++++++++++++---
 src/backend/storage/buffer/bufmgr.c    |  88 ++++++
 src/backend/storage/file/reinit.c      | 206 ++++++++------
 src/backend/storage/smgr/smgr.c        |   6 +
 src/common/relpath.c                   |   3 +-
 src/include/catalog/storage.h          |   2 +
 src/include/catalog/storage_xlog.h     |  16 ++
 src/include/common/relpath.h           |   5 +-
 src/include/storage/bufmgr.h           |   4 +
 src/include/storage/smgr.h             |   1 +
 12 files changed, 784 insertions(+), 142 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index a7c0cb1bc3..097dacfee6 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,23 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
                          xlrec->blkno, xlrec->flags);
         pfree(path);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+        char       *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+        appendStringInfoString(buf, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+        char       *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+        appendStringInfoString(buf, path);
+        appendStringInfo(buf, " persistence %d", xlrec->persistence);
+        pfree(path);
+    }
 }
 
 const char *
@@ -55,6 +72,12 @@ smgr_identify(uint8 info)
         case XLOG_SMGR_TRUNCATE:
             id = "TRUNCATE";
             break;
+        case XLOG_SMGR_UNLINK:
+            id = "UNLINK";
+            break;
+        case XLOG_SMGR_BUFPERSISTENCE:
+            id = "BUFPERSISTENCE";
+            break;
     }
 
     return id;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index d538f25726..0f1649758f 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -57,9 +58,19 @@ int            wal_skip_threshold = 2048;    /* in kilobytes */
  * but I'm being paranoid.
  */
 
+
+/* This is bit-map, not ordianal numbers  */
+#define    PDOP_DELETE                0x00
+#define    PDOP_UNLINK_FORK        0x01
+#define    PDOP_SET_PERSISTENCE    0x02
+
+
 typedef struct PendingRelDelete
 {
     RelFileNode relnode;        /* relation that may need to be deleted */
+    int            op;                /* operation mask */
+    bool        bufpersistence;    /* buffer persistence to set */
+    int            unlink_forknum;    /* forknum to unlink */
     BackendId    backend;        /* InvalidBackendId if not a temp rel */
     bool        atCommit;        /* T=delete at commit; F=delete at abort */
     int            nestLevel;        /* xact nesting level of request */
@@ -153,6 +164,7 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rnode;
+    pending->op = PDOP_DELETE;
     pending->backend = backend;
     pending->atCommit = false;    /* delete if abort */
     pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -168,6 +180,209 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *        Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    SMgrRelation srel;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              create = true;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(rel->rd_smgr, false, false);
+
+    /*
+     * If we have entries for init-fork operation of this relation, that means
+     * that we have already registered pending sync entries to drop preexisting
+     * init fork since before the current transaction started. This function
+     * reverts that change just by removing the entries.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->op != PDOP_DELETE)
+        {
+            if (prev)
+                prev->next = next;
+            else
+                pendingDeletes = next;
+            pfree(pending);
+
+            create = false;
+        }
+        else
+        {
+            /* unrelated entry, don't touch it */
+            prev = pending;
+        }
+    }
+
+    if (!create)
+        return;
+
+    /* We don't have existing init fork, create it. */
+    srel = smgropen(rnode, InvalidBackendId);
+    smgrcreate(srel, INIT_FORKNUM, false);
+
+    /*
+     * index-init fork needs further initialization. ambuildempty shoud do
+     * WAL-log and file sync by itself but otherwise we do that by myself.
+     */
+    if (rel->rd_rel->relkind == RELKIND_INDEX)
+        rel->rd_indam->ambuildempty(rel);
+    else
+    {
+        log_smgrcreate(&rnode, INIT_FORKNUM);
+        smgrimmedsync(srel, INIT_FORKNUM);
+    }
+
+    /*
+     * We have created the init fork. If server crashes before the current
+     * transaction ends the init fork left alone corrupts data while recovery.
+     * The inittmp fork works as the sentinel to identify that situaton.
+     */
+    smgrcreate(srel, INITTMP_FORKNUM, false);
+    log_smgrcreate(&rnode, INITTMP_FORKNUM);
+    smgrimmedsync(srel, INITTMP_FORKNUM);
+
+    /* drop this init fork file at abort and revert persistence */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK | PDOP_SET_PERSISTENCE;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->bufpersistence = true;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* drop inittmp fork at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = INITTMP_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* drop inittmp fork at commit*/
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = INITTMP_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *        Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion doesn't happen.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              inxact_created = false;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(rel->rd_smgr, true, false);
+
+    /*
+     * If we have entries for init-fork  operation of this relation, that means
+     * that  we have  created the  init fork  in the  current transaction.   We
+     * immediately remove the init and  inittmp forks immediately in that case.
+     * Otherwise just reister pending-delete for the existing init fork.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->op != PDOP_DELETE)
+        {
+            /* unlink list entry */
+            if (prev)
+                prev->next = next;
+            else
+                pendingDeletes = next;
+            pfree(pending);
+
+            inxact_created = true;
+        }
+        else
+        {
+            /* unrelated entry, don't touch it */
+            prev = pending;
+        }
+    }
+
+    if (inxact_created)
+    {
+        SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+        smgrclose(srel);
+        log_smgrunlink(&rnode, INIT_FORKNUM);
+        smgrunlink(srel, INIT_FORKNUM, false);
+        log_smgrunlink(&rnode, INITTMP_FORKNUM);
+        smgrunlink(srel, INITTMP_FORKNUM, false);
+        return;
+    }
+
+    /* register drop of this init fork file at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* revert buffer-persistence changes at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_SET_PERSISTENCE;
+    pending->bufpersistence = false;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +402,44 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
     XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+    xl_smgr_unlink xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file unlink.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+    xl_smgr_bufpersistence xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file unlink.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.persistence = persistence;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *        Schedule unlinking of physical storage at transaction commit.
@@ -200,6 +453,7 @@ RelationDropStorage(Relation rel)
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rel->rd_node;
+    pending->op = PDOP_DELETE;
     pending->backend = rel->rd_backend;
     pending->atCommit = true;    /* delete if commit */
     pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -606,43 +860,68 @@ smgrDoPendingDeletes(bool isCommit)
     prev = NULL;
     for (pending = pendingDeletes; pending != NULL; pending = next)
     {
+        SMgrRelation srel;
+
         next = pending->next;
         if (pending->nestLevel < nestLevel)
         {
             /* outer-level entries should not be processed yet */
             prev = pending;
+            continue;
         }
+
+        /* unlink list entry first, so we don't retry on failure */
+        if (prev)
+            prev->next = next;
         else
+            pendingDeletes = next;
+
+        if (pending->atCommit != isCommit)
         {
-            /* unlink list entry first, so we don't retry on failure */
-            if (prev)
-                prev->next = next;
-            else
-                pendingDeletes = next;
-            /* do deletion if called for */
-            if (pending->atCommit == isCommit)
-            {
-                SMgrRelation srel;
-
-                srel = smgropen(pending->relnode, pending->backend);
-
-                /* allocate the initial array, or extend it, if needed */
-                if (maxrels == 0)
-                {
-                    maxrels = 8;
-                    srels = palloc(sizeof(SMgrRelation) * maxrels);
-                }
-                else if (maxrels <= nrels)
-                {
-                    maxrels *= 2;
-                    srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
-                }
-
-                srels[nrels++] = srel;
-            }
             /* must explicitly free the list entry */
             pfree(pending);
             /* prev does not change */
+            continue;
+        }
+
+        srel = smgropen(pending->relnode, pending->backend);
+
+        if (pending->op != PDOP_DELETE)
+        {
+            if (pending->op & PDOP_UNLINK_FORK)
+            {
+                BlockNumber block = 0;
+                RelFileNodeBackend rbnode;
+
+                rbnode.node = pending->relnode;
+                rbnode.backend = InvalidBackendId;
+
+                DropRelFileNodeBuffers(rbnode, &pending->unlink_forknum, 1,
+                                       &block);
+                smgrclose(srel);
+                log_smgrunlink(&pending->relnode, pending->unlink_forknum);
+                smgrunlink(srel, pending->unlink_forknum, false);
+            }
+
+            if (pending->op & PDOP_SET_PERSISTENCE)
+                SetRelationBuffersPersistence(srel, pending->bufpersistence,
+                                              false);
+        }
+        else
+        {
+            /* allocate the initial array, or extend it, if needed */
+            if (maxrels == 0)
+            {
+                maxrels = 8;
+                srels = palloc(sizeof(SMgrRelation) * maxrels);
+            }
+            else if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            srels[nrels++] = srel;
         }
     }
 
@@ -824,7 +1103,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId &&
+            pending->op == PDOP_DELETE)
             nrels++;
     }
     if (nrels == 0)
@@ -837,7 +1117,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId &&
+            pending->op == PDOP_DELETE)
         {
             *rptr = pending->relnode;
             rptr++;
@@ -917,6 +1198,15 @@ smgr_redo(XLogReaderState *record)
         reln = smgropen(xlrec->rnode, InvalidBackendId);
         smgrcreate(reln, xlrec->forkNum, true);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+        SMgrRelation reln;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        smgrclose(reln);
+        smgrunlink(reln, xlrec->forkNum, true);
+    }
     else if (info == XLOG_SMGR_TRUNCATE)
     {
         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1005,6 +1295,15 @@ smgr_redo(XLogReaderState *record)
 
         FreeFakeRelcacheEntry(rel);
     }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec =
+            (xl_smgr_bufpersistence *) XLogRecGetData(record);
+        SMgrRelation reln;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+    }
     else
         elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index e3cfaf8b07..29f786142a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4916,6 +4916,142 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
     tab->afterStmts = list_concat(tab->afterStmts, afterStmts);
 
     return newcmd;
+}
+
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+                          LOCKMODE lockmode)
+{
+    Relation     rel;
+    Relation    classRel;
+    HeapTuple    tuple,
+                newtuple;
+    Datum        new_val[Natts_pg_class];
+    bool        new_null[Natts_pg_class],
+                new_repl[Natts_pg_class];
+    int            i;
+    List       *relids;
+    ListCell   *lc_oid;
+
+    Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+    Assert(lockmode == AccessExclusiveLock);
+
+    /*
+     * Under the following condition, we need to call ATRewriteTable, which
+     * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+     */
+    Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+           tab->newvals == NULL && !tab->verify_new_notnull);
+
+    rel = table_open(tab->relid, lockmode);
+
+    Assert(rel->rd_rel->relpersistence != persistence);
+
+    elog(DEBUG1, "perform im-place persistnce change");
+
+    RelationOpenSmgr(rel);
+
+    /*
+     * First we collect all relations that we need to change persistence.
+     */
+
+    /* Collect OIDs of indexes and toast relations */
+    relids = RelationGetIndexList(rel);
+    relids = lcons_oid(rel->rd_id, relids);
+
+    /* Add toast relation if any */
+    if (OidIsValid(rel->rd_rel->reltoastrelid))
+    {
+        List    *toastidx;
+        Relation toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+        RelationOpenSmgr(toastrel);
+        relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+        toastidx = RelationGetIndexList(toastrel);
+        relids = list_concat(relids, toastidx);
+        pfree(toastidx);
+        table_close(toastrel, NoLock);
+    }
+
+    table_close(rel, lockmode);
+
+    /* Make changes in storage */
+    classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+    foreach (lc_oid, relids)
+    {
+        Oid reloid = lfirst_oid(lc_oid);
+        Relation r = relation_open(reloid, lockmode);
+
+        RelationOpenSmgr(r);
+
+        /* Create or drop init fork */
+        if (persistence == RELPERSISTENCE_UNLOGGED)
+            RelationCreateInitFork(r);
+        else
+            RelationDropInitFork(r);
+
+        /*
+         * When this relation gets WAL-logged, immediately sync all files but
+         * initfork to establish the initial state on storage.  Buffers have
+         * alredy flushed out by RelationCreate(Drop)InitFork called just
+         * above. Initfork should have been synced as needed.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT)
+        {
+            for (i = 0 ; i < INIT_FORKNUM ; i++)
+            {
+                if (smgrexists(r->rd_smgr, i))
+                    smgrimmedsync(r->rd_smgr, i);
+            }
+        }
+
+        /* Update catalog */
+        tuple = SearchSysCacheCopy1(RELOID,    ObjectIdGetDatum(reloid));
+        if (!HeapTupleIsValid(tuple))
+            elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+        memset(new_val, 0, sizeof(new_val));
+        memset(new_null, false, sizeof(new_null));
+        memset(new_repl, false, sizeof(new_repl));
+
+        new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+        new_null[Anum_pg_class_relpersistence - 1] = false;
+        new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+        newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+                                     new_val, new_null, new_repl);
+
+        CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+        heap_freetuple(newtuple);
+
+        /*
+         * While wal_level >= replica, switching to LOGGED requires the
+         * relation content to be WAL-logged to recovery the table.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+        {
+            ForkNumber fork;
+
+            for (fork = 0; fork < INIT_FORKNUM ; fork++)
+            {
+                if (smgrexists(r->rd_smgr, fork))
+                    log_newpage_range(r, fork,
+                                      0, smgrnblocks(r->rd_smgr, fork), false);
+            }
+        }
+
+        table_close(r, NoLock);
+    }
+
+    table_close(classRel, NoLock);
+
+
+
+
 }
 
 /*
@@ -5038,45 +5174,52 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
                                          tab->relid,
                                          tab->rewrite);
 
-            /*
-             * Create transient table that will receive the modified data.
-             *
-             * Ensure it is marked correctly as logged or unlogged.  We have
-             * to do this here so that buffers for the new relfilenode will
-             * have the right persistence set, and at the same time ensure
-             * that the original filenode's buffers will get read in with the
-             * correct setting (i.e. the original one).  Otherwise a rollback
-             * after the rewrite would possibly result with buffers for the
-             * original filenode having the wrong persistence setting.
-             *
-             * NB: This relies on swap_relation_files() also swapping the
-             * persistence. That wouldn't work for pg_class, but that can't be
-             * unlogged anyway.
-             */
-            OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
-                                       lockmode);
+            if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+                RelationChangePersistence(tab, persistence, lockmode);
+            else
+            {
+                /*
+                 * Create transient table that will receive the modified data.
+                 *
+                 * Ensure it is marked correctly as logged or unlogged.  We
+                 * have to do this here so that buffers for the new relfilenode
+                 * will have the right persistence set, and at the same time
+                 * ensure that the original filenode's buffers will get read in
+                 * with the correct setting (i.e. the original one).  Otherwise
+                 * a rollback after the rewrite would possibly result with
+                 * buffers for the original filenode having the wrong
+                 * persistence setting.
+                 *
+                 * NB: This relies on swap_relation_files() also swapping the
+                 * persistence. That wouldn't work for pg_class, but that can't
+                 * be unlogged anyway.
+                 */
+                OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
+                                           lockmode);
 
-            /*
-             * Copy the heap data into the new table with the desired
-             * modifications, and test the current data within the table
-             * against new constraints generated by ALTER TABLE commands.
-             */
-            ATRewriteTable(tab, OIDNewHeap, lockmode);
+                /*
+                 * Copy the heap data into the new table with the desired
+                 * modifications, and test the current data within the table
+                 * against new constraints generated by ALTER TABLE commands.
+                 */
+                ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-            /*
-             * Swap the physical files of the old and new heaps, then rebuild
-             * indexes and discard the old heap.  We can use RecentXmin for
-             * the table's new relfrozenxid because we rewrote all the tuples
-             * in ATRewriteTable, so no older Xid remains in the table.  Also,
-             * we never try to swap toast tables by content, since we have no
-             * interest in letting this code work on system catalogs.
-             */
-            finish_heap_swap(tab->relid, OIDNewHeap,
-                             false, false, true,
-                             !OidIsValid(tab->newTableSpace),
-                             RecentXmin,
-                             ReadNextMultiXactId(),
-                             persistence);
+                /*
+                 * Swap the physical files of the old and new heaps, then
+                 * rebuild indexes and discard the old heap.  We can use
+                 * RecentXmin for the table's new relfrozenxid because we
+                 * rewrote all the tuples in ATRewriteTable, so no older Xid
+                 * remains in the table.  Also, we never try to swap toast
+                 * tables by content, since we have no interest in letting this
+                 * code work on system catalogs.
+                 */
+                finish_heap_swap(tab->relid, OIDNewHeap,
+                                 false, false, true,
+                                 !OidIsValid(tab->newTableSpace),
+                                 RecentXmin,
+                                 ReadNextMultiXactId(),
+                                 persistence);
+            }
         }
         else
         {
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ad0d1a9abc..ddd0133cdf 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -3033,6 +3034,93 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
     }
 }
 
+/* ---------------------------------------------------------------------
+ *        SetRelFileNodeBuffersPersistence
+ *
+ *        This function changes the persistence of all buffer pages of a relation
+ *        then writes all dirty pages of the relation out to disk when switching
+ *        to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *        ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *        Generally, the caller should be holding AccessExclusiveLock on the
+ *        target relation to ensure that no other backend is busy dirtying
+ *        more blocks of the relation; the effects can't be expected to last
+ *        after the lock is released.
+ *
+ *        XXX currently it sequentially searches the buffer pool, should be
+ *        changed to more clever ways of searching.  This routine is not
+ *        used in any performance-critical code paths, so it's not worth
+ *        adding additional overhead to normal paths to make it go faster;
+ *        but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+    int            i;
+    RelFileNodeBackend rnode = srel->smgr_rnode;
+
+    Assert (!RelFileNodeBackendIsTemp(rnode));
+
+    if (!isRedo)
+        log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+    for (i = 0; i < NBuffers; i++)
+    {
+        BufferDesc *bufHdr = GetBufferDescriptor(i);
+        uint32        buf_state;
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+            continue;
+
+        ReservePrivateRefCountEntry();
+
+        buf_state = LockBufHdr(bufHdr);
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+        {
+            UnlockBufHdr(bufHdr, buf_state);
+            continue;
+        }
+
+        if (permanent)
+        {
+            /* Init fork is being dropped, drop buffers for it. */
+            if (bufHdr->tag.forkNum == INIT_FORKNUM)
+            {
+                InvalidateBuffer(bufHdr);
+                continue;
+            }
+
+            buf_state |= BM_PERMANENT;
+            pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+            /* we flush this buffer when swithing to PERMANENT */
+            if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+            {
+                PinBuffer_Locked(bufHdr);
+                LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                              LW_SHARED);
+                FlushBuffer(bufHdr, srel);
+                LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+                UnpinBuffer(bufHdr, true);
+            }
+            else
+                UnlockBufHdr(bufHdr, buf_state);
+        }
+        else
+        {
+            /* init fork is always BM_PERMANENT. See BufferAlloc */
+            if (bufHdr->tag.forkNum != INIT_FORKNUM)
+                buf_state &= ~BM_PERMANENT;
+
+            UnlockBufHdr(bufHdr, buf_state);
+        }
+    }
+}
+
 /* ---------------------------------------------------------------------
  *        DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index 0c2094f766..6524262a74 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -31,6 +31,7 @@ static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 typedef struct
 {
     char        oid[OIDCHARS + 1];
+    bool        dirty;
 } unlogged_relation_entry;
 
 /*
@@ -151,6 +152,8 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
     DIR           *dbspace_dir;
     struct dirent *de;
     char        rm_path[MAXPGPATH * 2];
+    HTAB       *hash;
+    HASHCTL        ctl;
 
     /* Caller must specify at least one operation. */
     Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
@@ -160,62 +163,73 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
      * the files with init forks.  Then, we go through again and nuke
      * everything with the same OID except the init fork.
      */
+
+    /*
+     * It's possible that someone could create a ton of unlogged relations
+     * in the same database & tablespace, so we'd better use a hash table
+     * rather than an array or linked list to keep track of which files
+     * need to be reset.  Otherwise, this cleanup operation would be
+     * O(n^2).
+     */
+    memset(&ctl, 0, sizeof(ctl));
+    ctl.keysize = sizeof(unlogged_relation_entry);
+    ctl.entrysize = sizeof(unlogged_relation_entry);
+    hash = hash_create("unlogged hash", 32, &ctl, HASH_ELEM);
+
+    /* Scan the directory. */
+    dbspace_dir = AllocateDir(dbspacedirname);
+    while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+    {
+        ForkNumber    forkNum;
+        int            oidchars;
+        bool        found;
+        unlogged_relation_entry key;
+        unlogged_relation_entry *ent;
+
+        /* Skip anything that doesn't look like a relation data file. */
+        if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+                                                 &forkNum))
+            continue;
+
+        /* Also skip it unless this is the init fork. */
+        if (forkNum != INIT_FORKNUM && forkNum != INITTMP_FORKNUM)
+            continue;
+
+        /*
+         * Put the OID portion of the name into the hash table, if it
+         * isn't already.
+         */
+        memset(key.oid, 0, sizeof(key.oid));
+        memcpy(key.oid, de->d_name, oidchars);
+        ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+        if (!found)
+            ent->dirty = 0;
+
+        /*
+         * If we have the inittmp fork, the transaction that created the
+         * corresponding init file was not committed nor aborted. Mark this
+         * init fork as dirty so that we can clean up them properly.
+         */
+        if (forkNum == INITTMP_FORKNUM)
+            ent->dirty = true;
+    }
+
+    /* Done with the first pass. */
+    FreeDir(dbspace_dir);
+
+    /*
+     * If we didn't find any init forks, there's no point in continuing;
+     * we can bail out now.
+     */
+    if (hash_get_num_entries(hash) == 0)
+    {
+        hash_destroy(hash);
+        return;
+    }
+
     if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
     {
-        HTAB       *hash;
-        HASHCTL        ctl;
-
-        /*
-         * It's possible that someone could create a ton of unlogged relations
-         * in the same database & tablespace, so we'd better use a hash table
-         * rather than an array or linked list to keep track of which files
-         * need to be reset.  Otherwise, this cleanup operation would be
-         * O(n^2).
-         */
-        memset(&ctl, 0, sizeof(ctl));
-        ctl.keysize = sizeof(unlogged_relation_entry);
-        ctl.entrysize = sizeof(unlogged_relation_entry);
-        hash = hash_create("unlogged hash", 32, &ctl, HASH_ELEM);
-
-        /* Scan the directory. */
-        dbspace_dir = AllocateDir(dbspacedirname);
-        while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-        {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
-
-            /* Skip anything that doesn't look like a relation data file. */
-            if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* Also skip it unless this is the init fork. */
-            if (forkNum != INIT_FORKNUM)
-                continue;
-
-            /*
-             * Put the OID portion of the name into the hash table, if it
-             * isn't already.
-             */
-            memset(ent.oid, 0, sizeof(ent.oid));
-            memcpy(ent.oid, de->d_name, oidchars);
-            hash_search(hash, &ent, HASH_ENTER, NULL);
-        }
-
-        /* Done with the first pass. */
-        FreeDir(dbspace_dir);
-
-        /*
-         * If we didn't find any init forks, there's no point in continuing;
-         * we can bail out now.
-         */
-        if (hash_get_num_entries(hash) == 0)
-        {
-            hash_destroy(hash);
-            return;
-        }
-
         /*
          * Now, make a second pass and remove anything that matches.
          */
@@ -224,39 +238,48 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         {
             ForkNumber    forkNum;
             int            oidchars;
-            bool        found;
-            unlogged_relation_entry ent;
+            unlogged_relation_entry key;
+            unlogged_relation_entry *ent;
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
                                                      &forkNum))
                 continue;
 
-            /* We never remove the init fork. */
-            if (forkNum == INIT_FORKNUM)
-                continue;
-
             /*
              * See whether the OID portion of the name shows up in the hash
              * table.
              */
-            memset(ent.oid, 0, sizeof(ent.oid));
-            memcpy(ent.oid, de->d_name, oidchars);
-            hash_search(hash, &ent, HASH_FIND, &found);
+            memset(key.oid, 0, sizeof(key.oid));
+            memcpy(key.oid, de->d_name, oidchars);
+            ent = hash_search(hash, &key, HASH_FIND, NULL);
 
-            /* If so, nuke it! */
-            if (found)
+            /* Don't remove files if corresponding init fork is not found */
+            if (!ent)
+                continue;
+
+            if (!ent->dirty)
+            {
+                /* Don't remove clean init file */
+                if (forkNum == INIT_FORKNUM)
+                    continue;
+            }else
             {
-                snprintf(rm_path, sizeof(rm_path), "%s/%s",
-                         dbspacedirname, de->d_name);
-                if (unlink(rm_path) < 0)
-                    ereport(ERROR,
-                            (errcode_for_file_access(),
-                             errmsg("could not remove file \"%s\": %m",
-                                    rm_path)));
-                else
-                    elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+                /* Remove dirty init file, together with inittmp file  */
+                if (forkNum != INIT_FORKNUM && forkNum != INITTMP_FORKNUM)
+                    continue;
             }
+
+            /* so, nuke it! */
+            snprintf(rm_path, sizeof(rm_path), "%s/%s",
+                     dbspacedirname, de->d_name);
+            if (unlink(rm_path) < 0)
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m",
+                                rm_path)));
+            else
+                elog(DEBUG2, "unlinked file \"%s\"", rm_path);
         }
 
         /* Cleanup is complete. */
@@ -273,6 +296,9 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
      */
     if ((op & UNLOGGED_RELATION_INIT) != 0)
     {
+        unlogged_relation_entry key;
+        unlogged_relation_entry *ent;
+
         /* Scan the directory. */
         dbspace_dir = AllocateDir(dbspacedirname);
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
@@ -288,6 +314,38 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
                                                      &forkNum))
                 continue;
 
+            /*
+             * See whether the OID portion of the name shows up in the hash
+             * table.
+             */
+            memset(key.oid, 0, sizeof(key.oid));
+            memcpy(key.oid, de->d_name, oidchars);
+            ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+            /* Don't init file that doesn't have the init fork. */
+            if (!ent)
+                continue;
+
+            if (ent->dirty &&
+                (forkNum == INIT_FORKNUM || forkNum == INITTMP_FORKNUM))
+            {
+                /*
+                 * The init file is dirty. The files has been removed once at
+                 * cleanup time but recovery can create them again. Remove both
+                 * INIT and INITTMP files.
+                 */
+                snprintf(rm_path, sizeof(rm_path), "%s/%s",
+                         dbspacedirname, de->d_name);
+                if (unlink(rm_path) < 0)
+                    ereport(ERROR,
+                            (errcode_for_file_access(),
+                             errmsg("could not remove file \"%s\": %m",
+                                    rm_path)));
+                else
+                    elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+                continue;
+            }
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index dcc09df0c7..5eb9e97b3d 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -645,6 +645,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
     smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/common/relpath.c b/src/common/relpath.c
index ad733d1363..2a5e5fa990 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -34,7 +34,8 @@ const char *const forkNames[] = {
     "main",                        /* MAIN_FORKNUM */
     "fsm",                        /* FSM_FORKNUM */
     "vm",                        /* VISIBILITYMAP_FORKNUM */
-    "init"                        /* INIT_FORKNUM */
+    "init",                        /* INIT_FORKNUM */
+    "itmp"                        /* INITTMP_FORKNUM */
 };
 
 StaticAssertDecl(lengthof(forkNames) == (MAX_FORKNUM + 1),
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 30c38e0ca6..c2259cd7e3 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int    wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 7b21cab2e0..d48b5288ce 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -29,6 +29,8 @@
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE    0x10
 #define XLOG_SMGR_TRUNCATE    0x20
+#define XLOG_SMGR_UNLINK    0x30
+#define XLOG_SMGR_BUFPERSISTENCE    0x40
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +38,18 @@ typedef struct xl_smgr_create
     ForkNumber    forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+    RelFileNode rnode;
+    ForkNumber    forkNum;
+} xl_smgr_unlink;
+
+typedef struct xl_smgr_bufpersistence
+{
+    RelFileNode rnode;
+    bool        persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP        0x0001
 #define SMGR_TRUNCATE_VM        0x0002
@@ -51,6 +65,8 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index 869cabcc0d..f6e1a74a38 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -43,7 +43,8 @@ typedef enum ForkNumber
     MAIN_FORKNUM = 0,
     FSM_FORKNUM,
     VISIBILITYMAP_FORKNUM,
-    INIT_FORKNUM
+    INIT_FORKNUM,
+    INITTMP_FORKNUM
 
     /*
      * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
@@ -52,7 +53,7 @@ typedef enum ForkNumber
      */
 } ForkNumber;
 
-#define MAX_FORKNUM        INIT_FORKNUM
+#define MAX_FORKNUM        INITTMP_FORKNUM
 
 #define FORKNAMECHARS    4        /* max chars for a fork name */
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ee91b8fa26..e2496ed1c8 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -168,6 +168,8 @@ extern PGDLLIMPORT int32 *LocalRefCount;
  */
 #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer))
 
+struct SmgrRelationData;
+
 /*
  * prototypes for functions in bufmgr.c
  */
@@ -205,6 +207,8 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
                                    int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+                                          bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index f28a842401..5d74631006 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,6 +86,7 @@ extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-- 
2.18.4

From 5ce0551b9685dcd742bdcdf610ac80424327a9b5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 23:21:09 +0900
Subject: [PATCH v3 2/2] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 src/backend/commands/tablecmds.c | 140 +++++++++++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c    |  16 ++++
 src/backend/nodes/equalfuncs.c   |  15 ++++
 src/backend/parser/gram.y        |  20 +++++
 src/backend/tcop/utility.c       |  11 +++
 src/include/commands/tablecmds.h |   2 +
 src/include/nodes/nodes.h        |   1 +
 src/include/nodes/parsenodes.h   |   9 ++
 8 files changed, 214 insertions(+)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 29f786142a..ec2a45357b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -13665,6 +13665,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
     return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistene only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt)
+{
+    List       *relations = NIL;
+    ListCell   *l;
+    ScanKeyData key[1];
+    Relation    rel;
+    TableScanDesc scan;
+    HeapTuple    tuple;
+    Oid            tablespaceoid;
+    List       *role_oids = roleSpecsToIds(NIL);
+
+    /* Ensure we were not asked to change something we can't */
+    if (stmt->objtype != OBJECT_TABLE)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only tables can be specified")));
+
+    /* Get the tablespace OID */
+    tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+    /*
+     * Now that the checks are done, check if we should set either to
+     * InvalidOid because it is our database's default tablespace.
+     */
+    if (tablespaceoid == MyDatabaseTableSpace)
+        tablespaceoid = InvalidOid;
+
+    /*
+     * Walk the list of objects in the tablespace to pick up them. This will
+     * only find objects in our database, of course.
+     */
+    ScanKeyInit(&key[0],
+                Anum_pg_class_reltablespace,
+                BTEqualStrategyNumber, F_OIDEQ,
+                ObjectIdGetDatum(tablespaceoid));
+
+    rel = table_open(RelationRelationId, AccessShareLock);
+    scan = table_beginscan_catalog(rel, 1, key);
+    while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+    {
+        Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+        Oid            relOid = relForm->oid;
+
+        /*
+         * Do not pick-up objects in pg_catalog as part of this, if an admin
+         * really wishes to do so, they can issue the individual ALTER
+         * commands directly.
+         *
+         * Also, explicitly avoid any shared tables, temp tables, or TOAST
+         * (TOAST will be changed with the main table).
+         */
+        if (IsCatalogNamespace(relForm->relnamespace) ||
+            relForm->relisshared ||
+            isAnyTempNamespace(relForm->relnamespace) ||
+            IsToastNamespace(relForm->relnamespace))
+            continue;
+
+        /* Only pick up the object type requested */
+        if (relForm->relkind != RELKIND_RELATION)
+            continue;
+
+        /* Check if we are only picking-up objects owned by certain roles */
+        if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+            continue;
+
+        /*
+         * Handle permissions-checking here since we are locking the tables
+         * and also to avoid doing a bunch of work only to fail part-way. Note
+         * that permissions will also be checked by AlterTableInternal().
+         *
+         * Caller must be considered an owner on the table of which we're going
+         * to change persistence.
+         */
+        if (!pg_class_ownercheck(relOid, GetUserId()))
+            aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+                           NameStr(relForm->relname));
+
+        if (stmt->nowait &&
+            !ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+            ereport(ERROR,
+                    (errcode(ERRCODE_OBJECT_IN_USE),
+                     errmsg("aborting because lock on relation \"%s.%s\" is not available",
+                            get_namespace_name(relForm->relnamespace),
+                            NameStr(relForm->relname))));
+        else
+            LockRelationOid(relOid, AccessExclusiveLock);
+
+        /*
+         * Add to our list of objects of which we're going to change
+         * persistence.
+         */
+        relations = lappend_oid(relations, relOid);
+    }
+
+    table_endscan(scan);
+    table_close(rel, AccessShareLock);
+
+    if (relations == NIL)
+        ereport(NOTICE,
+                (errcode(ERRCODE_NO_DATA_FOUND),
+                 errmsg("no matching relations in tablespace \"%s\" found",
+                        tablespaceoid == InvalidOid ? "(database default)" :
+                        get_tablespace_name(tablespaceoid))));
+
+    /*
+     * Everything is locked, loop through and change persistence of all of the
+     * relations.
+     */
+    foreach(l, relations)
+    {
+        List       *cmds = NIL;
+        AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+        if (stmt->logged)
+            cmd->subtype = AT_SetLogged;
+        else
+            cmd->subtype = AT_SetUnLogged;
+
+        cmds = lappend(cmds, cmd);
+
+        EventTriggerAlterTableStart((Node *) stmt);
+        /* OID is set by AlterTableInternal */
+        AlterTableInternal(lfirst_oid(l), cmds, false);
+        EventTriggerAlterTableEnd();
+    }
+}
+
 static void
 index_copy_data(Relation rel, RelFileNode newrnode)
 {
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 3031c52991..7bb8fc767b 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4120,6 +4120,19 @@ _copyAlterTableMoveAllStmt(const AlterTableMoveAllStmt *from)
     return newnode;
 }
 
+static AlterTableSetLoggedAllStmt *
+_copyAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *from)
+{
+    AlterTableSetLoggedAllStmt *newnode = makeNode(AlterTableSetLoggedAllStmt);
+
+    COPY_STRING_FIELD(tablespacename);
+    COPY_SCALAR_FIELD(objtype);
+    COPY_SCALAR_FIELD(logged);
+    COPY_SCALAR_FIELD(nowait);
+
+    return newnode;
+}
+
 static CreateExtensionStmt *
 _copyCreateExtensionStmt(const CreateExtensionStmt *from)
 {
@@ -5419,6 +5432,9 @@ copyObjectImpl(const void *from)
         case T_AlterTableMoveAllStmt:
             retval = _copyAlterTableMoveAllStmt(from);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _copyAlterTableSetLoggedAllStmt(from);
+            break;
         case T_CreateExtensionStmt:
             retval = _copyCreateExtensionStmt(from);
             break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 9aa853748d..55ab3d7039 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1856,6 +1856,18 @@ _equalAlterTableMoveAllStmt(const AlterTableMoveAllStmt *a,
     return true;
 }
 
+static bool
+_equalAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *a,
+                                 const AlterTableSetLoggedAllStmt *b)
+{
+    COMPARE_STRING_FIELD(tablespacename);
+    COMPARE_SCALAR_FIELD(objtype);
+    COMPARE_SCALAR_FIELD(logged);
+    COMPARE_SCALAR_FIELD(nowait);
+
+    return true;
+}
+
 static bool
 _equalCreateExtensionStmt(const CreateExtensionStmt *a, const CreateExtensionStmt *b)
 {
@@ -3474,6 +3486,9 @@ equal(const void *a, const void *b)
         case T_AlterTableMoveAllStmt:
             retval = _equalAlterTableMoveAllStmt(a, b);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _equalAlterTableSetLoggedAllStmt(a, b);
+            break;
         case T_CreateExtensionStmt:
             retval = _equalCreateExtensionStmt(a, b);
             break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 051f1f1d49..08da69e32f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -1893,6 +1893,26 @@ AlterTableStmt:
                     n->nowait = $13;
                     $$ = (Node *)n;
                 }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = true;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = false;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
         |    ALTER INDEX qualified_name alter_table_cmds
                 {
                     AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index f398027fa6..8066e7a607 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -160,6 +160,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
         case T_AlterTSConfigurationStmt:
         case T_AlterTSDictionaryStmt:
         case T_AlterTableMoveAllStmt:
+        case T_AlterTableSetLoggedAllStmt:
         case T_AlterTableSpaceOptionsStmt:
         case T_AlterTableStmt:
         case T_AlterTypeStmt:
@@ -1733,6 +1734,12 @@ ProcessUtilitySlow(ParseState *pstate,
                 commandCollected = true;
                 break;
 
+            case T_AlterTableSetLoggedAllStmt:
+                AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+                /* commands are stashed in AlterTableSetLoggedAll */
+                commandCollected = true;
+                break;
+
             case T_DropStmt:
                 ExecDropStmt((DropStmt *) parsetree, isTopLevel);
                 /* no commands stashed for DROP */
@@ -2616,6 +2623,10 @@ CreateCommandTag(Node *parsetree)
             tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
             break;
 
+        case T_AlterTableSetLoggedAllStmt:
+            tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+            break;
+
         case T_AlterTableStmt:
             tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
             break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index c1581ad178..206de61154 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid    AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
                                          Oid *oldschema);
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 7ddd8c011b..74bf050b67 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -424,6 +424,7 @@ typedef enum NodeTag
     T_AlterCollationStmt,
     T_CallStmt,
     T_AlterStatsStmt,
+    T_AlterTableSetLoggedAllStmt,
 
     /*
      * TAGS FOR PARSE TREE NODES (parsenodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 7ef9b0eac0..f5b4976ae1 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2235,6 +2235,15 @@ typedef struct AlterTableMoveAllStmt
     bool        nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+    NodeTag        type;
+    char       *tablespacename;
+    ObjectType    objtype;        /* Object type to move */
+    bool        logged;
+    bool        nowait;
+} AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *        Create/Alter Extension Statements
  * ----------------------
-- 
2.18.4


pgsql-hackers by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: Add docs stub for recovery.conf
Next
From: Thomas Munro
Date:
Subject: Re: POC: Cleaning up orphaned files using undo logs