Re: In-placre persistance change of a relation - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: In-placre persistance change of a relation
Date
Msg-id 20210325.140805.1356865766630580851.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: In-placre persistance change of a relation  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses Re: In-placre persistance change of a relation
List pgsql-hackers
(I'm not sure when the subject was broken..)

At Thu, 14 Jan 2021 17:32:17 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> Commit bea449c635 conflicts with this on the change of the definition
> of DropRelFileNodeBuffers. The change simplified this patch by a bit:p

In this version, I got rid of the "CLEANUP FORK"s, and added a new
system "Smgr marks".  The mark files have the name of the
corresponding fork file followed by ".u" (which means Uncommitted.).
"Uncommited"-marked main fork means the same as the CLEANUP2_FORKNUM
and uncommitted-marked init fork means the same as the CLEANUP_FORKNUM
in the previous version.x

I noticed that the previous version of the patch still leaves an
orphan main fork file after "BEGIN; CREATE TABLE x; ROLLBACK; (crash
before checkpoint)" since the "mark" file (or CLEANUP2_FORKNUM) is
revmoed at rollback.  In this version the responsibility to remove the
mark files is moved to SyncPostCheckpoint, where main fork files are
actually removed.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 27ea96d84dfc2f3e0d62c4b8f7d20cc30771cf86 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v6 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.

Also this allows for the cleanup of files left behind in the crash of
the transaction that created it.
---
 src/backend/access/rmgrdesc/smgrdesc.c |  52 +++
 src/backend/access/transam/README      |   8 +
 src/backend/access/transam/xlog.c      |  17 +
 src/backend/catalog/storage.c          | 520 +++++++++++++++++++++++--
 src/backend/commands/tablecmds.c       | 246 ++++++++++--
 src/backend/replication/basebackup.c   |   3 +-
 src/backend/storage/buffer/bufmgr.c    |  88 +++++
 src/backend/storage/file/fd.c          |   4 +-
 src/backend/storage/file/reinit.c      | 346 +++++++++++-----
 src/backend/storage/smgr/md.c          |  92 ++++-
 src/backend/storage/smgr/smgr.c        |  32 ++
 src/backend/storage/sync/sync.c        |  20 +-
 src/bin/pg_rewind/parsexlog.c          |  24 ++
 src/common/relpath.c                   |  47 ++-
 src/include/catalog/storage.h          |   2 +
 src/include/catalog/storage_xlog.h     |  42 +-
 src/include/common/relpath.h           |   9 +-
 src/include/storage/bufmgr.h           |   2 +
 src/include/storage/fd.h               |   1 +
 src/include/storage/md.h               |   8 +-
 src/include/storage/reinit.h           |  10 +-
 src/include/storage/smgr.h             |  17 +
 22 files changed, 1384 insertions(+), 206 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index 7755553d57..d251f22207 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,49 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
                          xlrec->blkno, xlrec->flags);
         pfree(path);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+        char       *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+        appendStringInfoString(buf, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) rec;
+        char       *path = GetRelationPath(xlrec->rnode.dbNode,
+                                           xlrec->rnode.spcNode,
+                                           xlrec->rnode.relNode,
+                                           InvalidBackendId,
+                                           xlrec->forkNum, xlrec->mark);
+        char       *action;
+
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                action = "CREATE";
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                action = "DELETE";
+                break;
+            default:
+                action = "<unknown action>";
+                break;
+        }
+
+        appendStringInfo(buf, "%s %s", action, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+        char       *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+        appendStringInfoString(buf, path);
+        appendStringInfo(buf, " persistence %d", xlrec->persistence);
+        pfree(path);
+    }
 }
 
 const char *
@@ -55,6 +98,15 @@ smgr_identify(uint8 info)
         case XLOG_SMGR_TRUNCATE:
             id = "TRUNCATE";
             break;
+        case XLOG_SMGR_UNLINK:
+            id = "UNLINK";
+            break;
+        case XLOG_SMGR_MARK:
+            id = "MARK";
+            break;
+        case XLOG_SMGR_BUFPERSISTENCE:
+            id = "BUFPERSISTENCE";
+            break;
     }
 
     return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 1edc8180c1..7cf77e4a02 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -724,6 +724,14 @@ we must panic and abort recovery.  The DBA will have to manually clean up
 then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
+The Smgr MARK files
+--------------------------------
+
+A smgr mark files is created when a new relation file is created to
+mark the relfilenode needs to be cleaned up at recovery time.  In
+contrast to 4 above, failure to remove smgr mark files will lead to
+data loss, in which case the server will shut down.
+
 
 Skipping WAL for New RelFileNode
 --------------------------------
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f8810e149..27bbe17395 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -40,6 +40,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
+#include "catalog/storage.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
 #include "common/controldata_utils.h"
@@ -4458,6 +4459,14 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
             {
                 ereport(DEBUG1,
                         (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+                /* cleanup garbage files left during crash recovery */
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+                /* run rollback cleanup if any */
+                smgrDoPendingDeletes(false);
+
                 InArchiveRecovery = true;
                 if (StandbyModeRequested)
                     StandbyMode = true;
@@ -7577,6 +7586,14 @@ StartupXLOG(void)
                 }
             }
 
+            /* cleanup garbage files left during crash recovery */
+            if (!InArchiveRecovery)
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+            /* run rollback cleanup if any */
+            smgrDoPendingDeletes(false);
+
             /* Allow resource managers to do any required cleanup. */
             for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
             {
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index cba7a9ada0..7302a3fad4 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -27,6 +28,7 @@
 #include "access/xlogutils.h"
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
+#include "common/hashfn.h"
 #include "miscadmin.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
@@ -57,9 +59,18 @@ int            wal_skip_threshold = 2048;    /* in kilobytes */
  * but I'm being paranoid.
  */
 
+#define    PDOP_DELETE                (1 << 0)
+#define    PDOP_UNLINK_FORK        (1 << 1)
+#define    PDOP_UNLINK_MARK        (1 << 2)
+#define    PDOP_SET_PERSISTENCE    (1 << 3)
+
 typedef struct PendingRelDelete
 {
     RelFileNode relnode;        /* relation that may need to be deleted */
+    int            op;                /* operation mask */
+    bool        bufpersistence;    /* buffer persistence to set */
+    int            unlink_forknum;    /* forknum to unlink */
+    StorageMarks unlink_mark;    /* mark to unlink */
     BackendId    backend;        /* InvalidBackendId if not a temp rel */
     bool        atCommit;        /* T=delete at commit; F=delete at abort */
     int            nestLevel;        /* xact nesting level of request */
@@ -75,6 +86,24 @@ typedef struct PendingRelSync
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 HTAB       *pendingSyncHash = NULL;
 
+typedef struct SRelHashEntry
+{
+    SMgrRelation  srel;
+    char          status;    /* for simplehash use */
+} SRelHashEntry;
+
+/* define hashtable for workarea for pending deletes */
+#define SH_PREFIX srelhash
+#define SH_ELEMENT_TYPE SRelHashEntry
+#define SH_KEY_TYPE SMgrRelation
+#define SH_KEY srel
+#define SH_HASH_KEY(tb, key) \
+    hash_bytes((unsigned char *)&key, sizeof(SMgrRelation))
+#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(SMgrRelation)) == 0)
+#define SH_SCOPE static inline
+#define SH_DEFINE
+#define SH_DECLARE
+#include "lib/simplehash.h"
 
 /*
  * AddPendingSync
@@ -143,22 +172,48 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
             return NULL;        /* placate compiler */
     }
 
+    /*
+     * We are going to create a new storage file. If server crashes before the
+     * current transaction ends the file needs to be cleaned up but there's no
+     * clue to the orphan files. The SMGR_MARK_UNCOMMITED mark file works as
+     * the signal of that situation.
+     */
     srel = smgropen(rnode, backend);
+    log_smgrcreatemark(&rnode, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
     smgrcreate(srel, MAIN_FORKNUM, false);
 
     if (needs_wal)
         log_smgrcreate(&srel->smgr_rnode.node, MAIN_FORKNUM);
 
-    /* Add the relation to the list of stuff to delete at abort */
+    /*
+     * Add the relation to the list of stuff to delete at abort. We don't
+     * remove the mark file at commit. It needs to persiste until the main fork
+     * file is actually deleted.  See SyncPostCheckpoint.
+     */
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rnode;
+    pending->op = PDOP_DELETE;
     pending->backend = backend;
     pending->atCommit = false;    /* delete if abort */
     pending->nestLevel = GetCurrentTransactionNestLevel();
     pending->next = pendingDeletes;
     pendingDeletes = pending;
 
+    /* drop cleanup fork at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_MARK;
+    pending->unlink_forknum = MAIN_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->backend = backend;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
     if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
     {
         Assert(backend == InvalidBackendId);
@@ -168,6 +223,207 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *        Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    SMgrRelation srel;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              create = true;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(rel->rd_smgr, false, false);
+
+    /*
+     * If we have entries for init-fork operation of this relation, that means
+     * that we have already registered pending delete entries to drop
+     * preexisting init fork since before the current transaction started. This
+     * function reverts that change just by removing the entries.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            (pending->op & PDOP_DELETE) == 0 &&
+            (pending->unlink_forknum == INIT_FORKNUM ||
+             (pending->op & PDOP_SET_PERSISTENCE) != 0))
+        {
+            if (prev)
+                prev->next = next;
+            else
+                pendingDeletes = next;
+            pfree(pending);
+
+            create = false;
+        }
+        else
+        {
+            /* unrelated entry, don't touch it */
+            prev = pending;
+        }
+    }
+
+    if (!create)
+        return;
+
+    /*
+     * We are going to create the init fork. If server crashes before the
+     * current transaction ends the init fork left alone corrupts data while
+     * recovery.  The cleanup fork works as the sentinel to identify that
+     * situation.
+     */
+    srel = smgropen(rnode, InvalidBackendId);
+    log_smgrcreatemark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+    /* We don't have existing init fork, create it. */
+    smgrcreate(srel, INIT_FORKNUM, false);
+
+    /*
+     * index-init fork needs further initialization. ambuildempty shoud do
+     * WAL-log and file sync by itself but otherwise we do that by ourselves.
+     */
+    if (rel->rd_rel->relkind == RELKIND_INDEX)
+        rel->rd_indam->ambuildempty(rel);
+    else
+    {
+        log_smgrcreate(&rnode, INIT_FORKNUM);
+        smgrimmedsync(srel, INIT_FORKNUM);
+    }
+
+    /* drop the init fork, mark file and revert persistence at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK | PDOP_UNLINK_MARK | PDOP_SET_PERSISTENCE;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->bufpersistence = true;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* drop mark file at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_MARK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *        Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion doesn't happen.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              inxact_created = false;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(rel->rd_smgr, true, false);
+
+    /*
+     * If we have entries for init-fork operations of this relation, that means
+     * that we have created the init fork in the current transaction.  We
+     * remove the init fork and mark file immediately in that case.  Otherwise
+     * just reister pending-delete for the existing init fork.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            (pending->op & PDOP_DELETE) == 0 &&
+            (pending->unlink_forknum == INIT_FORKNUM ||
+             (pending->op & PDOP_SET_PERSISTENCE) != 0))
+        {
+            /* unlink list entry */
+            if (prev)
+                prev->next = next;
+            else
+                pendingDeletes = next;
+            pfree(pending);
+
+            inxact_created = true;
+        }
+        else
+        {
+            /* unrelated entry, don't touch it */
+            prev = pending;
+        }
+    }
+
+    if (inxact_created)
+    {
+        SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+
+        /*
+         * INIT forks never be loaded to shared buffer so no point in dropping
+         * buffers for such files.
+         */
+        log_smgrunlinkmark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+        smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+        log_smgrunlink(&rnode, INIT_FORKNUM);
+        smgrunlink(srel, INIT_FORKNUM, false);
+        return;
+    }
+
+    /* register drop of this init fork file at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* revert buffer-persistence changes at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_SET_PERSISTENCE;
+    pending->bufpersistence = false;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +443,88 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
     XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+    xl_smgr_unlink xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file unlink.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATEMARK record to WAL.
+ */
+void
+log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_CREATE;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINKMARK record to WAL.
+ */
+void
+log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_UNLINK;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+    xl_smgr_bufpersistence xlrec;
+
+    /*
+     * Make an XLOG entry reporting the change of buffer persistence.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.persistence = persistence;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *        Schedule unlinking of physical storage at transaction commit.
@@ -200,6 +538,7 @@ RelationDropStorage(Relation rel)
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rel->rd_node;
+    pending->op = PDOP_DELETE;
     pending->backend = rel->rd_backend;
     pending->atCommit = true;    /* delete if commit */
     pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -602,59 +941,104 @@ smgrDoPendingDeletes(bool isCommit)
     int            nrels = 0,
                 maxrels = 0;
     SMgrRelation *srels = NULL;
+    srelhash_hash  *close_srels = NULL;
+    bool            found;
 
     prev = NULL;
     for (pending = pendingDeletes; pending != NULL; pending = next)
     {
+        SMgrRelation srel;
+
         next = pending->next;
         if (pending->nestLevel < nestLevel)
         {
             /* outer-level entries should not be processed yet */
             prev = pending;
+            continue;
         }
+
+        /* unlink list entry first, so we don't retry on failure */
+        if (prev)
+            prev->next = next;
         else
+            pendingDeletes = next;
+
+        if (pending->atCommit != isCommit)
         {
-            /* unlink list entry first, so we don't retry on failure */
-            if (prev)
-                prev->next = next;
-            else
-                pendingDeletes = next;
-            /* do deletion if called for */
-            if (pending->atCommit == isCommit)
-            {
-                SMgrRelation srel;
-
-                srel = smgropen(pending->relnode, pending->backend);
-
-                /* allocate the initial array, or extend it, if needed */
-                if (maxrels == 0)
-                {
-                    maxrels = 8;
-                    srels = palloc(sizeof(SMgrRelation) * maxrels);
-                }
-                else if (maxrels <= nrels)
-                {
-                    maxrels *= 2;
-                    srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
-                }
-
-                srels[nrels++] = srel;
-            }
             /* must explicitly free the list entry */
             pfree(pending);
             /* prev does not change */
+            continue;
         }
+
+        if (close_srels == NULL)
+            close_srels = srelhash_create(CurrentMemoryContext, 32, NULL);
+
+        srel = smgropen(pending->relnode, pending->backend);
+
+        /* Uniquify the smgr relations */
+        srelhash_insert(close_srels, srel, &found);
+
+        if (pending->op & PDOP_DELETE)
+        {
+            /* allocate the initial array, or extend it, if needed */
+            if (maxrels == 0)
+            {
+                maxrels = 8;
+                srels = palloc(sizeof(SMgrRelation) * maxrels);
+            }
+            else if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            srels[nrels++] = srel;
+        }
+
+        if (pending->op & PDOP_UNLINK_FORK)
+        {
+            /* other forks needs to drop buffers */
+            Assert(pending->unlink_forknum == INIT_FORKNUM);
+
+            /* Don't emit wal while recovery. */
+            if (!InRecovery)
+                log_smgrunlink(&pending->relnode, pending->unlink_forknum);
+            smgrunlink(srel, pending->unlink_forknum, false);
+        }
+
+        if (pending->op & PDOP_UNLINK_MARK)
+        {
+            if (!InRecovery)
+                log_smgrunlinkmark(&pending->relnode,
+                                   pending->unlink_forknum,
+                                   pending->unlink_mark);
+            smgrunlinkmark(srel, pending->unlink_forknum,
+                           pending->unlink_mark, InRecovery);
+        }
+
+        if (pending->op & PDOP_SET_PERSISTENCE)
+            SetRelationBuffersPersistence(srel, pending->bufpersistence,
+                                          InRecovery);
     }
 
     if (nrels > 0)
     {
         smgrdounlinkall(srels, nrels, false);
-
-        for (int i = 0; i < nrels; i++)
-            smgrclose(srels[i]);
-
         pfree(srels);
     }
+
+    if (close_srels)
+    {
+        srelhash_iterator i;
+        SRelHashEntry     *ent;
+
+        /* close smgr relatoins */
+        srelhash_start_iterate(close_srels, &i);
+        while ((ent = srelhash_iterate(close_srels, &i)) != NULL)
+            smgrclose(ent->srel);
+        srelhash_destroy(close_srels);
+    }
 }
 
 /*
@@ -824,7 +1208,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId
+            && pending->op == PDOP_DELETE)
             nrels++;
     }
     if (nrels == 0)
@@ -837,7 +1222,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId &&
+            pending->op == PDOP_DELETE)
         {
             *rptr = pending->relnode;
             rptr++;
@@ -917,6 +1303,15 @@ smgr_redo(XLogReaderState *record)
         reln = smgropen(xlrec->rnode, InvalidBackendId);
         smgrcreate(reln, xlrec->forkNum, true);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+        SMgrRelation reln;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        smgrunlink(reln, xlrec->forkNum, true);
+        smgrclose(reln);
+    }
     else if (info == XLOG_SMGR_TRUNCATE)
     {
         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1005,6 +1400,65 @@ smgr_redo(XLogReaderState *record)
 
         FreeFakeRelcacheEntry(rel);
     }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingRelDelete *pending;
+        bool        created = false;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                smgrcreatemark(reln, xlrec->forkNum, xlrec->mark, true);
+                created = true;
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                smgrunlinkmark(reln, xlrec->forkNum, xlrec->mark, true);
+                break;
+            default:
+                elog(ERROR, "unknown smgr_mark action \"%c\"", xlrec->mark);
+        }
+
+        if (created)
+        {
+            /* revert mark file operation at abort */
+            pending = (PendingRelDelete *)
+                MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+            pending->relnode = xlrec->rnode;
+            pending->op = PDOP_UNLINK_MARK;
+            pending->unlink_forknum = xlrec->forkNum;
+            pending->unlink_mark = xlrec->mark;
+            pending->backend = InvalidBackendId;
+            pending->atCommit = false;
+            pending->nestLevel = GetCurrentTransactionNestLevel();
+            pending->next = pendingDeletes;
+            pendingDeletes = pending;
+        }
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec =
+            (xl_smgr_bufpersistence *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingRelDelete *pending;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+        /* revert buffer-persistence changes at abort */
+        pending = (PendingRelDelete *)
+            MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+        pending->relnode = xlrec->rnode;
+        pending->op = PDOP_SET_PERSISTENCE;
+        pending->bufpersistence = !xlrec->persistence;
+        pending->backend = InvalidBackendId;
+        pending->atCommit = false;
+        pending->nestLevel = GetCurrentTransactionNestLevel();
+        pending->next = pendingDeletes;
+        pendingDeletes = pending;
+    }
     else
         elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3349bcfaa7..4e2bceffda 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -51,6 +51,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5085,6 +5086,170 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
     return newcmd;
 }
 
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+                          LOCKMODE lockmode)
+{
+    Relation     rel;
+    Relation    classRel;
+    HeapTuple    tuple,
+                newtuple;
+    Datum        new_val[Natts_pg_class];
+    bool        new_null[Natts_pg_class],
+                new_repl[Natts_pg_class];
+    int            i;
+    List       *relids;
+    ListCell   *lc_oid;
+
+    Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+    Assert(lockmode == AccessExclusiveLock);
+
+    /*
+     * Under the following condition, we need to call ATRewriteTable, which
+     * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+     */
+    Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+           tab->newvals == NULL && !tab->verify_new_notnull);
+
+    rel = table_open(tab->relid, lockmode);
+
+    Assert(rel->rd_rel->relpersistence != persistence);
+
+    elog(DEBUG1, "perform im-place persistnce change");
+
+    RelationOpenSmgr(rel);
+
+    /*
+     * First we collect all relations that we need to change persistence.
+     */
+
+    /* Collect OIDs of indexes and toast relations */
+    relids = RelationGetIndexList(rel);
+    relids = lcons_oid(rel->rd_id, relids);
+
+    /* Add toast relation if any */
+    if (OidIsValid(rel->rd_rel->reltoastrelid))
+    {
+        List    *toastidx;
+        Relation toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+        RelationOpenSmgr(toastrel);
+        relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+        toastidx = RelationGetIndexList(toastrel);
+        relids = list_concat(relids, toastidx);
+        pfree(toastidx);
+        table_close(toastrel, NoLock);
+    }
+
+    table_close(rel, NoLock);
+
+    /* Make changes in storage */
+    classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+    foreach (lc_oid, relids)
+    {
+        Oid reloid = lfirst_oid(lc_oid);
+        Relation r = relation_open(reloid, lockmode);
+        
+        /* 
+         * Some access methods do not accept in-place persistence change. For
+         * example, GiST uses page LSNs to figure out whether a block has
+         * changed, where UNLOGGED GiST indexes use fake LSNs that are
+         * incompatible with real LSNs used for LOGGED ones.
+         *
+         * XXXX: We don't bother allowing in-place persistence change for index
+         * methods other than btree for now.
+         */
+        if (r->rd_rel->relkind == RELKIND_INDEX &&
+            r->rd_rel->relam != BTREE_AM_OID)
+        {
+            int            reindex_flags;
+
+            /* reindex doesn't allow concurrent use of the index */
+            table_close(r, NoLock);
+
+            reindex_flags =
+                REINDEX_REL_SUPPRESS_INDEX_USE |
+                REINDEX_REL_CHECK_CONSTRAINTS;
+
+            /* Set the same persistence with the parent relation. */
+            if (persistence == RELPERSISTENCE_UNLOGGED)
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+            else
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+            reindex_index(reloid, reindex_flags, persistence, 0);
+
+            continue;
+        }
+
+        RelationOpenSmgr(r);
+
+        /* Create or drop init fork */
+        if (persistence == RELPERSISTENCE_UNLOGGED)
+            RelationCreateInitFork(r);
+        else
+            RelationDropInitFork(r);
+
+        /*
+         * When this relation gets WAL-logged, immediately sync all files but
+         * initfork to establish the initial state on storage.  Buffers have
+         * already flushed out by RelationCreate(Drop)InitFork called just
+         * above. Initfork should have been synced as needed.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT)
+        {
+            for (i = 0 ; i < INIT_FORKNUM ; i++)
+            {
+                if (smgrexists(r->rd_smgr, i))
+                    smgrimmedsync(r->rd_smgr, i);
+            }
+        }
+
+        /* Update catalog */
+        tuple = SearchSysCacheCopy1(RELOID,    ObjectIdGetDatum(reloid));
+        if (!HeapTupleIsValid(tuple))
+            elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+        memset(new_val, 0, sizeof(new_val));
+        memset(new_null, false, sizeof(new_null));
+        memset(new_repl, false, sizeof(new_repl));
+
+        new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+        new_null[Anum_pg_class_relpersistence - 1] = false;
+        new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+        newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+                                     new_val, new_null, new_repl);
+
+        CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+        heap_freetuple(newtuple);
+
+        /*
+         * While wal_level >= replica, switching to LOGGED requires the
+         * relation content to be WAL-logged to recover the table.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+        {
+            ForkNumber fork;
+
+            for (fork = 0; fork < INIT_FORKNUM ; fork++)
+            {
+                if (smgrexists(r->rd_smgr, fork))
+                    log_newpage_range(r, fork,
+                                      0, smgrnblocks(r->rd_smgr, fork), false);
+            }
+        }
+
+        table_close(r, NoLock);
+    }
+
+    table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5205,45 +5370,52 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
                                          tab->relid,
                                          tab->rewrite);
 
-            /*
-             * Create transient table that will receive the modified data.
-             *
-             * Ensure it is marked correctly as logged or unlogged.  We have
-             * to do this here so that buffers for the new relfilenode will
-             * have the right persistence set, and at the same time ensure
-             * that the original filenode's buffers will get read in with the
-             * correct setting (i.e. the original one).  Otherwise a rollback
-             * after the rewrite would possibly result with buffers for the
-             * original filenode having the wrong persistence setting.
-             *
-             * NB: This relies on swap_relation_files() also swapping the
-             * persistence. That wouldn't work for pg_class, but that can't be
-             * unlogged anyway.
-             */
-            OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
-                                       lockmode);
+            if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+                RelationChangePersistence(tab, persistence, lockmode);
+            else
+            {
+                /*
+                 * Create transient table that will receive the modified data.
+                 *
+                 * Ensure it is marked correctly as logged or unlogged.  We
+                 * have to do this here so that buffers for the new relfilenode
+                 * will have the right persistence set, and at the same time
+                 * ensure that the original filenode's buffers will get read in
+                 * with the correct setting (i.e. the original one).  Otherwise
+                 * a rollback after the rewrite would possibly result with
+                 * buffers for the original filenode having the wrong
+                 * persistence setting.
+                 *
+                 * NB: This relies on swap_relation_files() also swapping the
+                 * persistence. That wouldn't work for pg_class, but that can't
+                 * be unlogged anyway.
+                 */
+                OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
+                                           lockmode);
 
-            /*
-             * Copy the heap data into the new table with the desired
-             * modifications, and test the current data within the table
-             * against new constraints generated by ALTER TABLE commands.
-             */
-            ATRewriteTable(tab, OIDNewHeap, lockmode);
+                /*
+                 * Copy the heap data into the new table with the desired
+                 * modifications, and test the current data within the table
+                 * against new constraints generated by ALTER TABLE commands.
+                 */
+                ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-            /*
-             * Swap the physical files of the old and new heaps, then rebuild
-             * indexes and discard the old heap.  We can use RecentXmin for
-             * the table's new relfrozenxid because we rewrote all the tuples
-             * in ATRewriteTable, so no older Xid remains in the table.  Also,
-             * we never try to swap toast tables by content, since we have no
-             * interest in letting this code work on system catalogs.
-             */
-            finish_heap_swap(tab->relid, OIDNewHeap,
-                             false, false, true,
-                             !OidIsValid(tab->newTableSpace),
-                             RecentXmin,
-                             ReadNextMultiXactId(),
-                             persistence);
+                /*
+                 * Swap the physical files of the old and new heaps, then
+                 * rebuild indexes and discard the old heap.  We can use
+                 * RecentXmin for the table's new relfrozenxid because we
+                 * rewrote all the tuples in ATRewriteTable, so no older Xid
+                 * remains in the table.  Also, we never try to swap toast
+                 * tables by content, since we have no interest in letting this
+                 * code work on system catalogs.
+                 */
+                finish_heap_swap(tab->relid, OIDNewHeap,
+                                 false, false, true,
+                                 !OidIsValid(tab->newTableSpace),
+                                 RecentXmin,
+                                 ReadNextMultiXactId(),
+                                 persistence);
+            }
         }
         else
         {
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 56cd473f9f..bc5288de05 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1255,6 +1255,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
         bool        excludeFound;
         ForkNumber    relForkNum; /* Type of fork if file is a relation */
         int            relOidChars;    /* Chars in filename that are the rel oid */
+        StorageMarks mark;
 
         /* Skip special stuff */
         if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
@@ -1305,7 +1306,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
         /* Exclude all forks for unlogged tables except the init fork */
         if (isDbDir &&
             parse_filename_for_nontemp_relation(de->d_name, &relOidChars,
-                                                &relForkNum))
+                                                &relForkNum, &mark))
         {
             /* Never exclude init forks */
             if (relForkNum != INIT_FORKNUM)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 852138f9c9..50674fd027 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -3100,6 +3101,93 @@ DropRelFileNodeBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
     }
 }
 
+/* ---------------------------------------------------------------------
+ *        SetRelFileNodeBuffersPersistence
+ *
+ *        This function changes the persistence of all buffer pages of a relation
+ *        then writes all dirty pages of the relation out to disk when switching
+ *        to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *        ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *        Generally, the caller should be holding AccessExclusiveLock on the
+ *        target relation to ensure that no other backend is busy dirtying
+ *        more blocks of the relation; the effects can't be expected to last
+ *        after the lock is released.
+ *
+ *        XXX currently it sequentially searches the buffer pool, should be
+ *        changed to more clever ways of searching.  This routine is not
+ *        used in any performance-critical code paths, so it's not worth
+ *        adding additional overhead to normal paths to make it go faster;
+ *        but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+    int            i;
+    RelFileNodeBackend rnode = srel->smgr_rnode;
+
+    Assert (!RelFileNodeBackendIsTemp(rnode));
+
+    if (!isRedo)
+        log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+    for (i = 0; i < NBuffers; i++)
+    {
+        BufferDesc *bufHdr = GetBufferDescriptor(i);
+        uint32        buf_state;
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+            continue;
+
+        ReservePrivateRefCountEntry();
+
+        buf_state = LockBufHdr(bufHdr);
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+        {
+            UnlockBufHdr(bufHdr, buf_state);
+            continue;
+        }
+
+        if (permanent)
+        {
+            /* Init fork is being dropped, drop buffers for it. */
+            if (bufHdr->tag.forkNum == INIT_FORKNUM)
+            {
+                InvalidateBuffer(bufHdr);
+                continue;
+            }
+
+            buf_state |= BM_PERMANENT;
+            pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+            /* we flush this buffer when switching to PERMANENT */
+            if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+            {
+                PinBuffer_Locked(bufHdr);
+                LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                              LW_SHARED);
+                FlushBuffer(bufHdr, srel);
+                LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+                UnpinBuffer(bufHdr, true);
+            }
+            else
+                UnlockBufHdr(bufHdr, buf_state);
+        }
+        else
+        {
+            /* init fork is always BM_PERMANENT. See BufferAlloc */
+            if (bufHdr->tag.forkNum != INIT_FORKNUM)
+                buf_state &= ~BM_PERMANENT;
+
+            UnlockBufHdr(bufHdr, buf_state);
+        }
+    }
+}
+
 /* ---------------------------------------------------------------------
  *        DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 06b57ae71f..bdf6916d63 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -342,8 +342,6 @@ static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
 static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
-static int    fsync_parent_path(const char *fname, int elevel);
-
 
 /*
  * pg_fsync --- do fsync with or without writethrough
@@ -3647,7 +3645,7 @@ fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel)
  * This is aimed at making file operations persistent on disk in case of
  * an OS crash or power failure.
  */
-static int
+int
 fsync_parent_path(const char *fname, int elevel)
 {
     char        parentpath[MAXPGPATH];
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index 40c758d789..f52d2ac199 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,50 @@
 
 #include <unistd.h>
 
+#include "access/xlog.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-                                                  int op);
+                                                  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-                                               int op);
+                                               Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
     Oid            reloid;            /* hash key */
-} unlogged_relation_entry;
+    bool        has_init;        /* has INIT fork */
+    bool        dirty_init;        /* needs to remove INIT fork */
+    bool        dirty_all;        /* needs to remove all forks */
+} relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of the "cleanup" forks.
  *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
+ * whole relation along with the mark file.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that have the "cleanup" and/or the "init" forks.
+ * 
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
  */
@@ -68,7 +89,7 @@ ResetUnloggedRelations(int op)
     /*
      * First process unlogged files in pg_default ($PGDATA/base)
      */
-    ResetUnloggedRelationsInTablespaceDir("base", op);
+    ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
     /*
      * Cycle through directories for all non-default tablespaces.
@@ -77,13 +98,19 @@ ResetUnloggedRelations(int op)
 
     while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
     {
+        Oid tspid;
+
         if (strcmp(spc_de->d_name, ".") == 0 ||
             strcmp(spc_de->d_name, "..") == 0)
             continue;
 
         snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
                  spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-        ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+        tspid = atooid(spc_de->d_name);
+        Assert(tspid != 0);
+
+        ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
     }
 
     FreeDir(spc_dir);
@@ -99,7 +126,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+                                      Oid tspid, int op)
 {
     DIR           *ts_dir;
     struct dirent *de;
@@ -126,6 +154,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
     while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
     {
+        Oid dbid;
+
         /*
          * We're only interested in the per-database directories, which have
          * numeric names.  Note that this code will also (properly) ignore "."
@@ -136,7 +166,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
         snprintf(dbspace_path, sizeof(dbspace_path), "%s/%s",
                  tsdirname, de->d_name);
-        ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+        dbid = atooid(de->d_name);
+        Assert(dbid != 0);
+
+        ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
     }
 
     FreeDir(ts_dir);
@@ -146,125 +179,228 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+                                   Oid tspid, Oid dbid, int op)
 {
     DIR           *dbspace_dir;
     struct dirent *de;
     char        rm_path[MAXPGPATH * 2];
+    HTAB       *hash;
+    HASHCTL        ctl;
 
     /* Caller must specify at least one operation. */
-    Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+    Assert((op & (UNLOGGED_RELATION_CLEANUP |
+                  UNLOGGED_RELATION_DROP_BUFFER |
+                  UNLOGGED_RELATION_INIT)) != 0);
 
     /*
      * Cleanup is a two-pass operation.  First, we go through and identify all
      * the files with init forks.  Then, we go through again and nuke
      * everything with the same OID except the init fork.
      */
+
+    /*
+     * It's possible that someone could create a ton of unlogged relations
+     * in the same database & tablespace, so we'd better use a hash table
+     * rather than an array or linked list to keep track of which files
+     * need to be reset.  Otherwise, this cleanup operation would be
+     * O(n^2).
+     */
+    memset(&ctl, 0, sizeof(ctl));
+    ctl.keysize = sizeof(Oid);
+    ctl.entrysize = sizeof(relfile_entry);
+    hash = hash_create("relfilenode cleanup hash",
+                       32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+    /* Collect INIT fork and mark files in the directory. */
+    dbspace_dir = AllocateDir(dbspacedirname);
+    while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+    {
+        int            oidchars;
+        ForkNumber    forkNum;
+        StorageMarks mark;
+
+        /* Skip anything that doesn't look like a relation data file. */
+        if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+                                                 &forkNum, &mark))
+            continue;
+
+        if (forkNum == INIT_FORKNUM || mark == SMGR_MARK_UNCOMMITTED)
+        {
+            Oid                key;
+            relfile_entry  *ent;
+            bool            found;
+
+            /*
+             * Record the relfilenode information. If it has
+             * SMGR_MARK_UNCOMMITTED mark files, the relfilenode is in dirty
+             * state, where clean up is needed.
+             */
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+            if (!found)
+            {
+                ent->has_init = false;
+                ent->dirty_init = false;
+                ent->dirty_all = false;
+            }
+
+            if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_init = true;
+            else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_all = true;
+            else
+            {
+                Assert(forkNum == INIT_FORKNUM);
+                ent->has_init = true;
+            }
+        }
+    }
+
+    /* Done with the first pass. */
+    FreeDir(dbspace_dir);
+
+    /* nothing to do if we don't have init nor cleanup forks */
+    if (hash_get_num_entries(hash) < 1)
+    {
+        hash_destroy(hash);
+        return;
+    }
+
+    if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+    {
+        /*
+         * When we come here after recovery, smgr object for this file might
+         * have been created. In that case we need to drop all buffers then the
+         * smgr object before initializing the unlogged relation.  This is safe
+         * as far as no other backends have accessed the relation before
+         * starting archive recovery.
+         */
+        HASH_SEQ_STATUS status;
+        relfile_entry *ent;
+        SMgrRelation   *srels = palloc(sizeof(SMgrRelation) * 8);
+        int               maxrels = 8;
+        int               nrels = 0;
+        int i;
+
+        Assert(!HotStandbyActive());
+
+        hash_seq_init(&status, hash);
+        while((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
+        {
+            RelFileNodeBackend rel;
+
+            /*
+             * The relation is persistent and stays remain persistent. Don't
+             * drop the buffers for this relation.
+             */
+            if (ent->has_init && ent->dirty_init)
+                continue;
+
+            if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = ent->reloid;
+
+            srels[nrels++] = smgropen(rel.node, InvalidBackendId);
+        }
+
+        DropRelFileNodesAllBuffers(srels, nrels);
+
+        for (i = 0 ; i < nrels ; i++)
+            smgrclose(srels[i]);
+    }
+
+    /*
+     * Now, make a second pass and remove anything that matches.
+     */
     if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
     {
-        HTAB       *hash;
-        HASHCTL        ctl;
-
-        /*
-         * It's possible that someone could create a ton of unlogged relations
-         * in the same database & tablespace, so we'd better use a hash table
-         * rather than an array or linked list to keep track of which files
-         * need to be reset.  Otherwise, this cleanup operation would be
-         * O(n^2).
-         */
-        ctl.keysize = sizeof(Oid);
-        ctl.entrysize = sizeof(unlogged_relation_entry);
-        ctl.hcxt = CurrentMemoryContext;
-        hash = hash_create("unlogged relation OIDs", 32, &ctl,
-                           HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-        /* Scan the directory. */
         dbspace_dir = AllocateDir(dbspacedirname);
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
+            ForkNumber        forkNum;
+            StorageMarks    mark;
+            int                oidchars;
+            Oid                key;
+            relfile_entry  *ent;
+            RelFileNodeBackend rel;
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* Also skip it unless this is the init fork. */
-            if (forkNum != INIT_FORKNUM)
-                continue;
-
-            /*
-             * Put the OID portion of the name into the hash table, if it
-             * isn't already.
-             */
-            ent.reloid = atooid(de->d_name);
-            (void) hash_search(hash, &ent, HASH_ENTER, NULL);
-        }
-
-        /* Done with the first pass. */
-        FreeDir(dbspace_dir);
-
-        /*
-         * If we didn't find any init forks, there's no point in continuing;
-         * we can bail out now.
-         */
-        if (hash_get_num_entries(hash) == 0)
-        {
-            hash_destroy(hash);
-            return;
-        }
-
-        /*
-         * Now, make a second pass and remove anything that matches.
-         */
-        dbspace_dir = AllocateDir(dbspacedirname);
-        while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-        {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
-
-            /* Skip anything that doesn't look like a relation data file. */
-            if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* We never remove the init fork. */
-            if (forkNum == INIT_FORKNUM)
+                                                     &forkNum, &mark))
                 continue;
 
             /*
              * See whether the OID portion of the name shows up in the hash
              * table.  If so, nuke it!
              */
-            ent.reloid = atooid(de->d_name);
-            if (hash_search(hash, &ent, HASH_FIND, NULL))
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+            if (!ent)
+                continue;
+
+            if (!ent->dirty_all)
             {
-                snprintf(rm_path, sizeof(rm_path), "%s/%s",
-                         dbspacedirname, de->d_name);
-                if (unlink(rm_path) < 0)
-                    ereport(ERROR,
-                            (errcode_for_file_access(),
-                             errmsg("could not remove file \"%s\": %m",
-                                    rm_path)));
+                /* clean permanent relations don't need cleanup */
+                if (!ent->has_init)
+                    continue;
+
+                if (ent->dirty_init)
+                {
+                    /*
+                     * The crashed trasaction did SET UNLOGGED. This relation
+                     * is restored to a LOGGED relation.
+                     */
+                    if (forkNum != INIT_FORKNUM)
+                        continue;
+                }
                 else
-                    elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+                {
+                    /*
+                     * we don't remove the INIT fork of a non-dirty
+                     * relfilenode
+                     */
+                    if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+                        continue;
+                }
             }
+
+            /* so, nuke it! */
+            snprintf(rm_path, sizeof(rm_path), "%s/%s",
+                     dbspacedirname, de->d_name);
+            if (unlink(rm_path) < 0)
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m",
+                                rm_path)));
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = atooid(de->d_name);
+
+            ForgetRelationForkSyncRequests(rel, forkNum);
         }
 
         /* Cleanup is complete. */
         FreeDir(dbspace_dir);
-        hash_destroy(hash);
     }
 
+    hash_destroy(hash);
+    hash = NULL;
+
     /*
      * Initialization happens after cleanup is complete: we copy each init
-     * fork file to the corresponding main fork file.  Note that if we are
-     * asked to do both cleanup and init, we may never get here: if the
-     * cleanup code determines that there are no init forks in this dbspace,
-     * it will return before we get to this point.
+     * fork file to the corresponding main fork file.
      */
     if ((op & UNLOGGED_RELATION_INIT) != 0)
     {
@@ -273,6 +409,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        srcpath[MAXPGPATH * 2];
@@ -280,9 +417,11 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -316,15 +455,18 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        mainpath[MAXPGPATH];
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -367,7 +509,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
  */
 bool
 parse_filename_for_nontemp_relation(const char *name, int *oidchars,
-                                    ForkNumber *fork)
+                                    ForkNumber *fork, StorageMarks *mark)
 {
     int            pos;
 
@@ -398,11 +540,19 @@ parse_filename_for_nontemp_relation(const char *name, int *oidchars,
 
         for (segchar = 1; isdigit((unsigned char) name[pos + segchar]); ++segchar)
             ;
-        if (segchar <= 1)
-            return false;
-        pos += segchar;
+        if (segchar > 1)
+            pos += segchar;
     }
 
+    /* mark file? */
+    if (name[pos] == '.' && name[pos + 1] != 0)
+    {
+        *mark = name[pos + 1];
+        pos += 2;
+    }
+    else
+        *mark = SMGR_MARK_NONE;
+
     /* Now we should be at the end. */
     if (name[pos] != '\0')
         return false;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 1e12cfad8e..87a777b307 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -139,7 +139,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
                              BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
                               MdfdVec *seg);
-
+static bool mdmarkexists(SMgrRelation reln, ForkNumber forkNum,
+                         StorageMarks mark);
 
 /*
  *    mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -169,6 +170,80 @@ mdexists(SMgrRelation reln, ForkNumber forkNum)
     return (mdopenfork(reln, forkNum, EXTENSION_RETURN_NULL) != NULL);
 }
 
+/*
+ *  mdcreatemark() -- Create a mark file.
+ *
+ * If isRedo is true, it's okay for the file to exist already.
+ */
+void
+mdcreatemark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path =markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    /* See mdcreate for details.. */
+    TablespaceCreateDbspace(reln->smgr_rnode.node.spcNode,
+                            reln->smgr_rnode.node.dbNode,
+                            isRedo);
+
+    fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL);
+    if (fd < 0 && (!isRedo || errno != EEXIST))
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not crete mark file \"%s\": %m", path)));
+
+    pfree(path);
+    pg_fsync(fd);
+    close(fd);
+
+    /*
+     * To guarantee that the creation of the file is persistent, fsync its
+     * parent directory.
+     */
+    fsync_parent_path(path, ERROR);
+}
+
+
+/*
+ *  mdunlinkmark()  -- Delete the mark file
+ *
+ * If isRedo is true, it's okay for the file being not found.
+ */
+void
+mdunlinkmark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+
+    if (!isRedo || mdmarkexists(reln, forkNum, mark))
+        durable_unlink(path, ERROR);
+
+    pfree(path);
+}
+
+/*
+ *  mdmarkexists()  -- Check if the file exists.
+ */
+static bool
+mdmarkexists(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    fd = BasicOpenFile(path, O_RDONLY);
+    if (fd < 0 && errno != ENOENT)
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not access mark file \"%s\": %m", path)));
+    pfree(path);
+
+    if (fd < 0)
+        return false;
+
+    return true;
+}
+
 /*
  *    mdcreate() -- Create a new relation on magnetic disk.
  *
@@ -1024,6 +1099,15 @@ register_forget_request(RelFileNodeBackend rnode, ForkNumber forknum,
     RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileNodeBackend rnode, ForkNumber forknum)
+{
+    register_forget_request(rnode, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
@@ -1377,12 +1461,14 @@ mdsyncfiletag(const FileTag *ftag, char *path)
  * Return 0 on success, -1 on failure, with errno set.
  */
 int
-mdunlinkfiletag(const FileTag *ftag, char *path)
+mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark)
 {
     char       *p;
 
     /* Compute the path. */
-    p = relpathperm(ftag->rnode, MAIN_FORKNUM);
+    p = GetRelationPath(ftag->rnode.dbNode, ftag->rnode.spcNode,
+                        ftag->rnode.relNode, InvalidBackendId, MAIN_FORKNUM,
+                        mark);
     strlcpy(path, p, MAXPGPATH);
     pfree(p);
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 4dc24649df..dd3496cf51 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -62,6 +62,10 @@ typedef struct f_smgr
     void        (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
                                   BlockNumber nblocks);
     void        (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+    void        (*smgr_createmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
+    void        (*smgr_unlinkmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -82,6 +86,8 @@ static const f_smgr smgrsw[] = {
         .smgr_nblocks = mdnblocks,
         .smgr_truncate = mdtruncate,
         .smgr_immedsync = mdimmedsync,
+        .smgr_createmark = mdcreatemark,
+        .smgr_unlinkmark = mdunlinkmark,
     }
 };
 
@@ -335,6 +341,26 @@ smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
     smgrsw[reln->smgr_which].smgr_create(reln, forknum, isRedo);
 }
 
+/*
+ *    smgrcreatemark() -- Create a mark file
+ */
+void
+smgrcreatemark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_createmark(reln, forknum, mark, isRedo);
+}
+
+/*
+ *    smgrunlinkmark() -- Delete a mark file
+ */
+void
+smgrunlinkmark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlinkmark(reln, forknum, mark, isRedo);
+}
+
 /*
  *    smgrdosyncall() -- Immediately sync all forks of all given relations
  *
@@ -662,6 +688,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
     smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index 708215614d..a23c03ca3e 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -88,7 +88,8 @@ static CycleCtr checkpoint_cycle_ctr = 0;
 typedef struct SyncOps
 {
     int            (*sync_syncfiletag) (const FileTag *ftag, char *path);
-    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path);
+    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path,
+                                       StorageMarks mark);
     bool        (*sync_filetagmatches) (const FileTag *ftag,
                                         const FileTag *candidate);
 } SyncOps;
@@ -216,7 +217,8 @@ SyncPostCheckpoint(void)
 
         /* Unlink the file */
         if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-                                                          path) < 0)
+                                                          path,
+                                                          SMGR_MARK_NONE) < 0)
         {
             /*
              * There's a race condition, when the database is dropped at the
@@ -230,6 +232,20 @@ SyncPostCheckpoint(void)
                         (errcode_for_file_access(),
                          errmsg("could not remove file \"%s\": %m", path)));
         }
+        else if (syncsw[entry->tag.handler].sync_unlinkfiletag(
+                     &entry->tag, path,
+                     SMGR_MARK_UNCOMMITTED) < 0)
+        {
+            /*
+             * And we may have SMGR_MARK_UNCOMMITTED file.  Remove it if the
+             * fork files has been successfully removed. It's ok if the file
+             * does not exist.
+             */
+            if (errno != ENOENT)
+                ereport(WARNING,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m", path)));
+        }
 
         /* And remove the list entry */
         pendingUnlinks = list_delete_first(pendingUnlinks);
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 59ebac7d6a..db6b658489 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -407,6 +407,30 @@ extractPageInfo(XLogReaderState *record)
          * source system.
          */
     }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_UNLINK)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_MARK)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
     else if (rmid == RM_XACT_ID &&
              ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
               (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 1f5c426ec0..67f24890d6 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -139,9 +139,15 @@ GetDatabasePath(Oid dbNode, Oid spcNode)
  */
 char *
 GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                int backendId, ForkNumber forkNumber)
+                int backendId, ForkNumber forkNumber, char mark)
 {
     char       *path;
+    char        markstr[10];
+
+    if (mark == 0)
+        markstr[0] = 0;
+    else
+        snprintf(markstr, 10, ".%c", mark);
 
     if (spcNode == GLOBALTABLESPACE_OID)
     {
@@ -149,10 +155,10 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         Assert(dbNode == 0);
         Assert(backendId == InvalidBackendId);
         if (forkNumber != MAIN_FORKNUM)
-            path = psprintf("global/%u_%s",
-                            relNode, forkNames[forkNumber]);
+            path = psprintf("global/%u_%s%s",
+                            relNode, forkNames[forkNumber], markstr);
         else
-            path = psprintf("global/%u", relNode);
+            path = psprintf("global/%u%s", relNode, markstr);
     }
     else if (spcNode == DEFAULTTABLESPACE_OID)
     {
@@ -160,22 +166,22 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/%u_%s",
+                path = psprintf("base/%u/%u_%s%s",
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/%u",
-                                dbNode, relNode);
+                path = psprintf("base/%u/%u%s",
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/t%d_%u_%s",
+                path = psprintf("base/%u/t%d_%u_%s%s",
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/t%d_%u",
-                                dbNode, backendId, relNode);
+                path = psprintf("base/%u/t%d_%u%s",
+                                dbNode, backendId, relNode, markstr);
         }
     }
     else
@@ -184,27 +190,28 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, relNode);
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, backendId, relNode);
+                                dbNode, backendId, relNode, markstr);
         }
     }
+
     return path;
 }
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 0ab32b44e9..382623159c 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int    wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index f0814f1458..12346ed7f6 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -18,17 +18,23 @@
 #include "lib/stringinfo.h"
 #include "storage/block.h"
 #include "storage/relfilenode.h"
+#include "storage/smgr.h"
 
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation and buffer persistence change here,
+ * but logging of deletion actions is handled mainly by xact.c, because it is
+ * part of transaction commit in most cases.  However, there's a case where
+ * init forks are deleted outside control of transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE    0x10
 #define XLOG_SMGR_TRUNCATE    0x20
+#define XLOG_SMGR_UNLINK    0x30
+#define XLOG_SMGR_MARK        0x40
+#define XLOG_SMGR_BUFPERSISTENCE    0x50
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +42,32 @@ typedef struct xl_smgr_create
     ForkNumber    forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+    RelFileNode rnode;
+    ForkNumber    forkNum;
+} xl_smgr_unlink;
+
+typedef enum smgr_mark_action
+{
+    XLOG_SMGR_MARK_CREATE = 'c',
+    XLOG_SMGR_MARK_UNLINK = 'u'
+} smgr_mark_action;
+
+typedef struct xl_smgr_mark
+{
+    RelFileNode     rnode;
+    ForkNumber        forkNum;
+    StorageMarks    mark;
+    smgr_mark_action action;
+} xl_smgr_mark;
+
+typedef struct xl_smgr_bufpersistence
+{
+    RelFileNode rnode;
+    bool        persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP        0x0001
 #define SMGR_TRUNCATE_VM        0x0002
@@ -51,6 +83,12 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a44be11ca0..106a5cf508 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -67,7 +67,7 @@ extern int    forkname_chars(const char *str, ForkNumber *fork);
 extern char *GetDatabasePath(Oid dbNode, Oid spcNode);
 
 extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                             int backendId, ForkNumber forkNumber);
+                             int backendId, ForkNumber forkNumber, char mark);
 
 /*
  * Wrapper macros for GetRelationPath.  Beware of multiple
@@ -77,7 +77,7 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 /* First argument is a RelFileNode */
 #define relpathbackend(rnode, backend, forknum) \
     GetRelationPath((rnode).dbNode, (rnode).spcNode, (rnode).relNode, \
-                    backend, forknum)
+                    backend, forknum, 0)
 
 /* First argument is a RelFileNode */
 #define relpathperm(rnode, forknum) \
@@ -87,4 +87,9 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 #define relpath(rnode, forknum) \
     relpathbackend((rnode).node, (rnode).backend, forknum)
 
+/* First argument is a RelFileNodeBackend */
+#define markpath(rnode, forknum, mark)                                \
+    GetRelationPath((rnode).node.dbNode, (rnode).node.spcNode, \
+                    (rnode).node.relNode, \
+                    (rnode).backend, forknum, mark)
 #endif                            /* RELPATH_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index fb00fda6a7..ccb0a388f6 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -205,6 +205,8 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
                                    int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+                                          bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(struct SMgrRelationData **smgr_reln, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 328473bdc9..485c58e5f1 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -167,6 +167,7 @@ extern ssize_t pg_pwritev_with_retry(int fd,
 extern int    pg_truncate(const char *path, off_t length);
 extern void fsync_fname(const char *fname, bool isdir);
 extern int    fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
+extern int    fsync_parent_path(const char *fname, int elevel);
 extern int    durable_rename(const char *oldfile, const char *newfile, int loglevel);
 extern int    durable_unlink(const char *fname, int loglevel);
 extern int    durable_rename_excl(const char *oldfile, const char *newfile, int loglevel);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 752b440864..99620816b5 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -23,6 +23,10 @@
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
 extern void mdclose(SMgrRelation reln, ForkNumber forknum);
+extern void mdcreatemark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool isRedo);
+extern void mdunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool    isRedo);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
@@ -41,12 +45,14 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
                        BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileNodeBackend rnode,
+                                           ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
 extern int    mdsyncfiletag(const FileTag *ftag, char *path);
-extern int    mdunlinkfiletag(const FileTag *ftag, char *path);
+extern int    mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark);
 extern bool mdfiletagmatches(const FileTag *ftag, const FileTag *candidate);
 
 #endif                            /* MD_H */
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index fad1e5c473..e1f97e9b89 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -16,13 +16,15 @@
 #define REINIT_H
 
 #include "common/relpath.h"
-
+#include "storage/smgr.h"
 
 extern void ResetUnloggedRelations(int op);
-extern bool parse_filename_for_nontemp_relation(const char *name,
-                                                int *oidchars, ForkNumber *fork);
+extern bool parse_filename_for_nontemp_relation(const char *name, int *oidchars,
+                                                ForkNumber *fork,
+                                                StorageMarks *mark);
 
 #define UNLOGGED_RELATION_CLEANUP        0x0001
-#define UNLOGGED_RELATION_INIT            0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER    0x0002
+#define UNLOGGED_RELATION_INIT            0x0004
 
 #endif                            /* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index a6fbf7b6a6..201ecace8a 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -18,6 +18,18 @@
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
+/*
+ * Storage marks is a file of which existence suggests something about a
+ * file. The name of such files is "<filename>.<mark>", where the mark is one
+ * of the values of StorageMarks. Since ".<digit>" means segment files so don't
+ * use digits for the mark character.
+ */
+typedef enum StorageMarks
+{
+    SMGR_MARK_NONE = 0,
+    SMGR_MARK_UNCOMMITTED = 'u'    /* the file is not committed yet */
+} StorageMarks;
+
 /*
  * smgr.c maintains a table of SMgrRelation objects, which are essentially
  * cached file handles.  An SMgrRelation is created (if not already present)
@@ -85,7 +97,12 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
+extern void smgrcreatemark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
+extern void smgrunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-- 
2.27.0

From 625bbc0e05a698aa2c19b5fba4947009358bd560 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 23:21:09 +0900
Subject: [PATCH v6 2/2] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 src/backend/commands/tablecmds.c | 140 +++++++++++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c    |  16 ++++
 src/backend/nodes/equalfuncs.c   |  15 ++++
 src/backend/parser/gram.y        |  20 +++++
 src/backend/tcop/utility.c       |  11 +++
 src/include/commands/tablecmds.h |   2 +
 src/include/nodes/nodes.h        |   1 +
 src/include/nodes/parsenodes.h   |   9 ++
 8 files changed, 214 insertions(+)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 4e2bceffda..26bf8298e9 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -13843,6 +13843,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
     return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistene only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt)
+{
+    List       *relations = NIL;
+    ListCell   *l;
+    ScanKeyData key[1];
+    Relation    rel;
+    TableScanDesc scan;
+    HeapTuple    tuple;
+    Oid            tablespaceoid;
+    List       *role_oids = roleSpecsToIds(NIL);
+
+    /* Ensure we were not asked to change something we can't */
+    if (stmt->objtype != OBJECT_TABLE)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only tables can be specified")));
+
+    /* Get the tablespace OID */
+    tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+    /*
+     * Now that the checks are done, check if we should set either to
+     * InvalidOid because it is our database's default tablespace.
+     */
+    if (tablespaceoid == MyDatabaseTableSpace)
+        tablespaceoid = InvalidOid;
+
+    /*
+     * Walk the list of objects in the tablespace to pick up them. This will
+     * only find objects in our database, of course.
+     */
+    ScanKeyInit(&key[0],
+                Anum_pg_class_reltablespace,
+                BTEqualStrategyNumber, F_OIDEQ,
+                ObjectIdGetDatum(tablespaceoid));
+
+    rel = table_open(RelationRelationId, AccessShareLock);
+    scan = table_beginscan_catalog(rel, 1, key);
+    while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+    {
+        Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+        Oid            relOid = relForm->oid;
+
+        /*
+         * Do not pick-up objects in pg_catalog as part of this, if an admin
+         * really wishes to do so, they can issue the individual ALTER
+         * commands directly.
+         *
+         * Also, explicitly avoid any shared tables, temp tables, or TOAST
+         * (TOAST will be changed with the main table).
+         */
+        if (IsCatalogNamespace(relForm->relnamespace) ||
+            relForm->relisshared ||
+            isAnyTempNamespace(relForm->relnamespace) ||
+            IsToastNamespace(relForm->relnamespace))
+            continue;
+
+        /* Only pick up the object type requested */
+        if (relForm->relkind != RELKIND_RELATION)
+            continue;
+
+        /* Check if we are only picking-up objects owned by certain roles */
+        if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+            continue;
+
+        /*
+         * Handle permissions-checking here since we are locking the tables
+         * and also to avoid doing a bunch of work only to fail part-way. Note
+         * that permissions will also be checked by AlterTableInternal().
+         *
+         * Caller must be considered an owner on the table of which we're going
+         * to change persistence.
+         */
+        if (!pg_class_ownercheck(relOid, GetUserId()))
+            aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+                           NameStr(relForm->relname));
+
+        if (stmt->nowait &&
+            !ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+            ereport(ERROR,
+                    (errcode(ERRCODE_OBJECT_IN_USE),
+                     errmsg("aborting because lock on relation \"%s.%s\" is not available",
+                            get_namespace_name(relForm->relnamespace),
+                            NameStr(relForm->relname))));
+        else
+            LockRelationOid(relOid, AccessExclusiveLock);
+
+        /*
+         * Add to our list of objects of which we're going to change
+         * persistence.
+         */
+        relations = lappend_oid(relations, relOid);
+    }
+
+    table_endscan(scan);
+    table_close(rel, AccessShareLock);
+
+    if (relations == NIL)
+        ereport(NOTICE,
+                (errcode(ERRCODE_NO_DATA_FOUND),
+                 errmsg("no matching relations in tablespace \"%s\" found",
+                        tablespaceoid == InvalidOid ? "(database default)" :
+                        get_tablespace_name(tablespaceoid))));
+
+    /*
+     * Everything is locked, loop through and change persistence of all of the
+     * relations.
+     */
+    foreach(l, relations)
+    {
+        List       *cmds = NIL;
+        AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+        if (stmt->logged)
+            cmd->subtype = AT_SetLogged;
+        else
+            cmd->subtype = AT_SetUnLogged;
+
+        cmds = lappend(cmds, cmd);
+
+        EventTriggerAlterTableStart((Node *) stmt);
+        /* OID is set by AlterTableInternal */
+        AlterTableInternal(lfirst_oid(l), cmds, false);
+        EventTriggerAlterTableEnd();
+    }
+}
+
 static void
 index_copy_data(Relation rel, RelFileNode newrnode)
 {
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 82d7cce5d5..3471b8e2cc 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4197,6 +4197,19 @@ _copyAlterTableMoveAllStmt(const AlterTableMoveAllStmt *from)
     return newnode;
 }
 
+static AlterTableSetLoggedAllStmt *
+_copyAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *from)
+{
+    AlterTableSetLoggedAllStmt *newnode = makeNode(AlterTableSetLoggedAllStmt);
+
+    COPY_STRING_FIELD(tablespacename);
+    COPY_SCALAR_FIELD(objtype);
+    COPY_SCALAR_FIELD(logged);
+    COPY_SCALAR_FIELD(nowait);
+
+    return newnode;
+}
+
 static CreateExtensionStmt *
 _copyCreateExtensionStmt(const CreateExtensionStmt *from)
 {
@@ -5503,6 +5516,9 @@ copyObjectImpl(const void *from)
         case T_AlterTableMoveAllStmt:
             retval = _copyAlterTableMoveAllStmt(from);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _copyAlterTableSetLoggedAllStmt(from);
+            break;
         case T_CreateExtensionStmt:
             retval = _copyCreateExtensionStmt(from);
             break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3e980c457c..d05aef4fde 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1875,6 +1875,18 @@ _equalAlterTableMoveAllStmt(const AlterTableMoveAllStmt *a,
     return true;
 }
 
+static bool
+_equalAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *a,
+                                 const AlterTableSetLoggedAllStmt *b)
+{
+    COMPARE_STRING_FIELD(tablespacename);
+    COMPARE_SCALAR_FIELD(objtype);
+    COMPARE_SCALAR_FIELD(logged);
+    COMPARE_SCALAR_FIELD(nowait);
+
+    return true;
+}
+
 static bool
 _equalCreateExtensionStmt(const CreateExtensionStmt *a, const CreateExtensionStmt *b)
 {
@@ -3528,6 +3540,9 @@ equal(const void *a, const void *b)
         case T_AlterTableMoveAllStmt:
             retval = _equalAlterTableMoveAllStmt(a, b);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _equalAlterTableSetLoggedAllStmt(a, b);
+            break;
         case T_CreateExtensionStmt:
             retval = _equalCreateExtensionStmt(a, b);
             break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index bc43641ffe..5c3fd1998e 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -1948,6 +1948,26 @@ AlterTableStmt:
                     n->nowait = $13;
                     $$ = (Node *)n;
                 }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = true;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = false;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
         |    ALTER INDEX qualified_name alter_table_cmds
                 {
                     AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 05bb698cf4..3c18312367 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -161,6 +161,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
         case T_AlterTSConfigurationStmt:
         case T_AlterTSDictionaryStmt:
         case T_AlterTableMoveAllStmt:
+        case T_AlterTableSetLoggedAllStmt:
         case T_AlterTableSpaceOptionsStmt:
         case T_AlterTableStmt:
         case T_AlterTypeStmt:
@@ -1694,6 +1695,12 @@ ProcessUtilitySlow(ParseState *pstate,
                 commandCollected = true;
                 break;
 
+            case T_AlterTableSetLoggedAllStmt:
+                AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+                /* commands are stashed in AlterTableSetLoggedAll */
+                commandCollected = true;
+                break;
+
             case T_DropStmt:
                 ExecDropStmt((DropStmt *) parsetree, isTopLevel);
                 /* no commands stashed for DROP */
@@ -2582,6 +2589,10 @@ CreateCommandTag(Node *parsetree)
             tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
             break;
 
+        case T_AlterTableSetLoggedAllStmt:
+            tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+            break;
+
         case T_AlterTableStmt:
             tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
             break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index b3d30acc35..b4af2db6f0 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid    AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
                                          Oid *oldschema);
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index e22df890ef..91dfc77978 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -427,6 +427,7 @@ typedef enum NodeTag
     T_AlterCollationStmt,
     T_CallStmt,
     T_AlterStatsStmt,
+    T_AlterTableSetLoggedAllStmt,
 
     /*
      * TAGS FOR PARSE TREE NODES (parsenodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 68425eb2c0..b9b75dc45b 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2293,6 +2293,15 @@ typedef struct AlterTableMoveAllStmt
     bool        nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+    NodeTag        type;
+    char       *tablespacename;
+    ObjectType    objtype;        /* Object type to move */
+    bool        logged;
+    bool        nowait;
+} AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *        Create/Alter Extension Statements
  * ----------------------
-- 
2.27.0


pgsql-hackers by date:

Previous
From: "tsunakawa.takay@fujitsu.com"
Date:
Subject: RE: Global snapshots
Next
From: Kyotaro Horiguchi
Date:
Subject: Re: In-placre persistance change of a relation