Re: In-placre persistance change of a relation - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: In-placre persistance change of a relation
Date
Msg-id 20220301.141413.1764860719587286375.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: In-placre persistance change of a relation  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses Re: In-placre persistance change of a relation  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Re: In-placre persistance change of a relation  (Justin Pryzby <pryzby@telsasoft.com>)
List pgsql-hackers
Rebased on a recent xlog refactoring.

No functional changes have been made.

- Removed the default case in smgr_desc since it seems to me we don't
 assume out-of-definition values in xlog records elsewhere.

- Simplified some added to storage.c.

- Fix copy-pasto'ed comments in extractPageInfo().

- The previous version smgrDoPendingCleanups() assumes that init-fork
  are not loaded onto shared buffer but it is wrong
  (SetRelationBuffersPersistence assumes the opposite.).  Thus we need
  to drop buffers before unlink an init fork. But it is already
  guaranteed by logic so I rewrote the comment for for PCOP_UNLINK_FORK.

  > * Unlink the fork file. Currently we use this only for
  > * init forks and we're sure that the init fork is not
  > * loaded on shared buffers.  For RelationDropInitFork
  > * case, the function dropped that buffers. For
  > * RelationCreateInitFork case, PCOP_SET_PERSISTENCE(true)
  > * is set and the buffers have been dropped just before.
  
  This logic has the same critical window as
  DropRelFilenodeBuffers. That is, if file deletion fails after
  successful buffer dropping, theoretically the file content of the
  init fork may be stale. However, AFAICS init-fork is write-once fork
  so I don't think that actually matters.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 420a9d9a0dae3bcfb1396c14997624ad67a3e557 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v18 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.

Also this allows for the cleanup of files left behind in the crash of
the transaction that created it.
---
 src/backend/access/rmgrdesc/smgrdesc.c    |  49 ++
 src/backend/access/transam/README         |   9 +
 src/backend/access/transam/xact.c         |   7 +
 src/backend/access/transam/xlogrecovery.c |  18 +
 src/backend/catalog/storage.c             | 548 +++++++++++++++++++++-
 src/backend/commands/tablecmds.c          | 266 +++++++++--
 src/backend/replication/basebackup.c      |   3 +-
 src/backend/storage/buffer/bufmgr.c       |  86 ++++
 src/backend/storage/file/fd.c             |   4 +-
 src/backend/storage/file/reinit.c         | 344 ++++++++++----
 src/backend/storage/smgr/md.c             |  94 +++-
 src/backend/storage/smgr/smgr.c           |  32 ++
 src/backend/storage/sync/sync.c           |  20 +-
 src/bin/pg_rewind/parsexlog.c             |  22 +
 src/common/relpath.c                      |  47 +-
 src/include/catalog/storage.h             |   3 +
 src/include/catalog/storage_xlog.h        |  42 +-
 src/include/common/relpath.h              |   9 +-
 src/include/storage/bufmgr.h              |   2 +
 src/include/storage/fd.h                  |   1 +
 src/include/storage/md.h                  |   8 +-
 src/include/storage/reinit.h              |  10 +-
 src/include/storage/smgr.h                |  17 +
 23 files changed, 1459 insertions(+), 182 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index 7547813254..f8908e2c0a 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,46 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
                          xlrec->blkno, xlrec->flags);
         pfree(path);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+        char       *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+        appendStringInfoString(buf, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) rec;
+        char       *path = GetRelationPath(xlrec->rnode.dbNode,
+                                           xlrec->rnode.spcNode,
+                                           xlrec->rnode.relNode,
+                                           InvalidBackendId,
+                                           xlrec->forkNum, xlrec->mark);
+        char       *action;
+
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                action = "CREATE";
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                action = "DELETE";
+                break;
+        }
+
+        appendStringInfo(buf, "%s %s", action, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+        char       *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+        appendStringInfoString(buf, path);
+        appendStringInfo(buf, " persistence %d", xlrec->persistence);
+        pfree(path);
+    }
 }
 
 const char *
@@ -55,6 +95,15 @@ smgr_identify(uint8 info)
         case XLOG_SMGR_TRUNCATE:
             id = "TRUNCATE";
             break;
+        case XLOG_SMGR_UNLINK:
+            id = "UNLINK";
+            break;
+        case XLOG_SMGR_MARK:
+            id = "MARK";
+            break;
+        case XLOG_SMGR_BUFPERSISTENCE:
+            id = "BUFPERSISTENCE";
+            break;
     }
 
     return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 1edc8180c1..2ecd8c8c7c 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -725,6 +725,15 @@ then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
 
+The Smgr MARK files
+--------------------------------
+
+An smgr mark file is created when a new relation file is created to
+mark the relfilenode needs to be cleaned up at recovery time.  In
+contrast to the four actions above, failure to remove smgr mark files
+will lead to data loss, in which case the server will shut down.
+
+
 Skipping WAL for New RelFileNode
 --------------------------------
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index adf763a8ea..559666b802 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2198,6 +2198,9 @@ CommitTransaction(void)
      */
     smgrDoPendingSyncs(true, is_parallel_worker);
 
+    /* Likewise delete mark files for files created during this transaction. */
+    smgrDoPendingCleanups(true);
+
     /* close large objects before lower-level cleanup */
     AtEOXact_LargeObject(true);
 
@@ -2448,6 +2451,9 @@ PrepareTransaction(void)
      */
     smgrDoPendingSyncs(true, false);
 
+    /* Likewise delete mark files for files created during this transaction. */
+    smgrDoPendingCleanups(true);
+
     /* close large objects before lower-level cleanup */
     AtEOXact_LargeObject(true);
 
@@ -2773,6 +2779,7 @@ AbortTransaction(void)
     AfterTriggerEndXact(false); /* 'false' means it's abort */
     AtAbort_Portals();
     smgrDoPendingSyncs(false, is_parallel_worker);
+    smgrDoPendingCleanups(false);
     AtEOXact_LargeObject(false);
     AtAbort_Notify();
     AtEOXact_RelationMap(false, is_parallel_worker);
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index f9f212680b..2923b8ef8c 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,6 +40,7 @@
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "catalog/storage.h"
 #include "commands/tablespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -53,6 +54,7 @@
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/reinit.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -1746,6 +1748,14 @@ PerformWalRecovery(void)
             }
         }
 
+        /* cleanup garbage files left during crash recovery */
+        if (!InArchiveRecovery)
+            ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                   UNLOGGED_RELATION_CLEANUP);
+
+        /* run rollback cleanup if any */
+        smgrDoPendingDeletes(false);
+
         /* Allow resource managers to do any required cleanup. */
         for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
         {
@@ -3022,6 +3032,14 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
             {
                 ereport(DEBUG1,
                         (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+                /* cleanup garbage files left during crash recovery */
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+                /* run rollback cleanup if any */
+                smgrDoPendingDeletes(false);
+
                 InArchiveRecovery = true;
                 if (StandbyModeRequested)
                     StandbyMode = true;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 9b8075536a..cd1445713a 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -66,6 +67,23 @@ typedef struct PendingRelDelete
     struct PendingRelDelete *next;    /* linked-list link */
 } PendingRelDelete;
 
+#define    PCOP_UNLINK_FORK        (1 << 0)
+#define    PCOP_UNLINK_MARK        (1 << 1)
+#define    PCOP_SET_PERSISTENCE    (1 << 2)
+
+typedef struct PendingCleanup
+{
+    RelFileNode relnode;        /* relation that may need to be deleted */
+    int            op;                /* operation mask */
+    bool        bufpersistence;    /* buffer persistence to set */
+    int            unlink_forknum;    /* forknum to unlink */
+    StorageMarks unlink_mark;    /* mark to unlink */
+    BackendId    backend;        /* InvalidBackendId if not a temp rel */
+    bool        atCommit;        /* T=delete at commit; F=delete at abort */
+    int            nestLevel;        /* xact nesting level of request */
+    struct PendingCleanup *next;    /* linked-list link */
+} PendingCleanup;
+
 typedef struct PendingRelSync
 {
     RelFileNode rnode;
@@ -73,6 +91,7 @@ typedef struct PendingRelSync
 } PendingRelSync;
 
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+static PendingCleanup   *pendingCleanups = NULL; /* head of linked list */
 HTAB       *pendingSyncHash = NULL;
 
 
@@ -117,7 +136,8 @@ AddPendingSync(const RelFileNode *rnode)
 SMgrRelation
 RelationCreateStorage(RelFileNode rnode, char relpersistence)
 {
-    PendingRelDelete *pending;
+    PendingRelDelete *pendingdel;
+    PendingCleanup     *pendingclean;
     SMgrRelation srel;
     BackendId    backend;
     bool        needs_wal;
@@ -143,21 +163,41 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
             return NULL;        /* placate compiler */
     }
 
+    /*
+     * We are going to create a new storage file. If server crashes before the
+     * current transaction ends the file needs to be cleaned up. The
+     * SMGR_MARK_UNCOMMITED mark file prompts that work at the next startup.
+     */
     srel = smgropen(rnode, backend);
+    log_smgrcreatemark(&rnode, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
     smgrcreate(srel, MAIN_FORKNUM, false);
 
     if (needs_wal)
         log_smgrcreate(&srel->smgr_rnode.node, MAIN_FORKNUM);
 
     /* Add the relation to the list of stuff to delete at abort */
-    pending = (PendingRelDelete *)
+    pendingdel = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-    pending->relnode = rnode;
-    pending->backend = backend;
-    pending->atCommit = false;    /* delete if abort */
-    pending->nestLevel = GetCurrentTransactionNestLevel();
-    pending->next = pendingDeletes;
-    pendingDeletes = pending;
+    pendingdel->relnode = rnode;
+    pendingdel->backend = backend;
+    pendingdel->atCommit = false;    /* delete if abort */
+    pendingdel->nestLevel = GetCurrentTransactionNestLevel();
+    pendingdel->next = pendingDeletes;
+    pendingDeletes = pendingdel;
+
+    /* drop mark files at commit */
+    pendingclean = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pendingclean->relnode = rnode;
+    pendingclean->op = PCOP_UNLINK_MARK;
+    pendingclean->unlink_forknum = MAIN_FORKNUM;
+    pendingclean->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pendingclean->backend = backend;
+    pendingclean->atCommit = true;
+    pendingclean->nestLevel = GetCurrentTransactionNestLevel();
+    pendingclean->next = pendingCleanups;
+    pendingCleanups = pendingclean;
 
     if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
     {
@@ -168,6 +208,200 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *        Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingCleanup *pending;
+    PendingCleanup *prev;
+    PendingCleanup *next;
+    SMgrRelation srel;
+    bool              create = true;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(RelationGetSmgr(rel), false, false);
+
+    /*
+     * If we have a pending-unlink for the init-fork of this relation, that
+     * means the init-fork exists since before the current transaction
+     * started. This function reverts that change just by removing the entry.
+     * See RelationDropInitFork.
+     */
+    prev = NULL;
+    for (pending = pendingCleanups; pending != NULL; pending = next)
+    {
+        next = pending->next;
+
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->unlink_forknum == INIT_FORKNUM)
+        {
+            if (prev)
+                prev->next = next;
+            else
+                pendingCleanups = next;
+
+            pfree(pending);
+            /* prev does not change */
+
+            create = false;
+        }
+        else
+            prev = pending;
+    }
+
+    if (!create)
+        return;
+
+    /* create the init fork, along with the commit-sentinel file */
+    srel = smgropen(rnode, InvalidBackendId);
+    log_smgrcreatemark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+    /* We don't have existing init fork, create it. */
+    smgrcreate(srel, INIT_FORKNUM, false);
+
+    /*
+     * init fork for indexes needs further initialization. ambuildempty should
+     * do WAL-log and file sync by itself but otherwise we do that by
+     * ourselves.
+     */
+    if (rel->rd_rel->relkind == RELKIND_INDEX)
+        rel->rd_indam->ambuildempty(rel);
+    else
+    {
+        log_smgrcreate(&rnode, INIT_FORKNUM);
+        smgrimmedsync(srel, INIT_FORKNUM);
+    }
+
+    /* drop the init fork, mark file and revert persistence at abort */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_UNLINK_FORK | PCOP_UNLINK_MARK | PCOP_SET_PERSISTENCE;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->bufpersistence = true;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+
+    /* drop mark file at commit */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_UNLINK_MARK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *        Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion is canceled.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingCleanup *pending;
+    PendingCleanup *prev;
+    PendingCleanup *next;
+    bool            inxact_created = false;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(RelationGetSmgr(rel), true, false);
+
+    /*
+     * If we have a pending-unlink for the init-fork of this relation, that
+     * means the init fork is created in the current transaction.  We remove
+     * both the init fork and mark file immediately in that case.  Otherwise
+     * just register a pending-unlink for the existing init fork.  See
+     * RelationCreateInitFork.
+     */
+    prev = NULL;
+    for (pending = pendingCleanups; pending != NULL; pending = next)
+    {
+        next = pending->next;
+
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->unlink_forknum != INIT_FORKNUM)
+        {
+            /* unlink list entry */
+            if (prev)
+                prev->next = next;
+            else
+                pendingCleanups = next;
+
+            pfree(pending);
+            /* prev does not change */
+
+            inxact_created = true;
+        }
+        else
+            prev = pending;
+    }
+
+    if (inxact_created)
+    {
+        SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+
+        /*
+         * INIT forks are never loaded to shared buffer so no point in dropping
+         * buffers for such files.
+         */
+        log_smgrunlinkmark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+        smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+        log_smgrunlink(&rnode, INIT_FORKNUM);
+        smgrunlink(srel, INIT_FORKNUM, false);
+        return;
+    }
+
+    /* register drop of this init fork file at commit */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_UNLINK_FORK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+
+    /* revert buffer-persistence changes at abort */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_SET_PERSISTENCE;
+    pending->bufpersistence = false;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +421,88 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
     XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+    xl_smgr_unlink xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file unlink.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATEMARK record to WAL.
+ */
+void
+log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_CREATE;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINKMARK record to WAL.
+ */
+void
+log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_UNLINK;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+    xl_smgr_bufpersistence xlrec;
+
+    /*
+     * Make an XLOG entry reporting the change of buffer persistence.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.persistence = persistence;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *        Schedule unlinking of physical storage at transaction commit.
@@ -673,6 +989,95 @@ smgrDoPendingDeletes(bool isCommit)
     }
 }
 
+/*
+ *    smgrDoPendingUnmark() -- Clean up work that emits WAL records
+ *
+ *  The operations handled in the function emits WAL records, which must be
+ *  part of the current transaction.
+ */
+void
+smgrDoPendingCleanups(bool isCommit)
+{
+    int            nestLevel = GetCurrentTransactionNestLevel();
+    PendingCleanup *pending;
+    PendingCleanup *prev;
+    PendingCleanup *next;
+
+    prev = NULL;
+    for (pending = pendingCleanups; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (pending->nestLevel < nestLevel)
+        {
+            /* outer-level entries should not be processed yet */
+            prev = pending;
+        }
+        else
+        {
+            /* unlink list entry first, so we don't retry on failure */
+            if (prev)
+                prev->next = next;
+            else
+                pendingCleanups = next;
+
+            /* do cleanup if called for */
+            if (pending->atCommit == isCommit)
+            {
+                SMgrRelation srel;
+
+                srel = smgropen(pending->relnode, pending->backend);
+
+                Assert ((pending->op &
+                         ~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK |
+                           PCOP_SET_PERSISTENCE)) == 0);
+
+                if (pending->op & PCOP_SET_PERSISTENCE)
+                {
+                    SetRelationBuffersPersistence(srel, pending->bufpersistence,
+                                                  InRecovery);
+                }
+
+                if (pending->op & PCOP_UNLINK_FORK)
+                {
+                    /*
+                     * Unlink the fork file. Currently we use this only for
+                     * init forks and we're sure that the init fork is not
+                     * loaded on shared buffers.  For RelationDropInitFork
+                     * case, the function dropped that buffers. For
+                     * RelationCreateInitFork case, PCOP_SET_PERSISTENCE(true)
+                     * is set and the buffers have been dropped just before.
+                     */
+                    Assert(pending->unlink_forknum == INIT_FORKNUM);
+
+                    /* Don't emit wal while recovery. */
+                    if (!InRecovery)
+                        log_smgrunlink(&pending->relnode,
+                                       pending->unlink_forknum);
+                    smgrunlink(srel, pending->unlink_forknum, false);
+                }
+
+                if (pending->op & PCOP_UNLINK_MARK)
+                {
+                    SMgrRelation srel;
+
+                    if (!InRecovery)
+                        log_smgrunlinkmark(&pending->relnode,
+                                           pending->unlink_forknum,
+                                           pending->unlink_mark);
+                    srel = smgropen(pending->relnode, pending->backend);
+                    smgrunlinkmark(srel, pending->unlink_forknum,
+                                   pending->unlink_mark, InRecovery);
+                    smgrclose(srel);
+                }
+            }
+
+            /* must explicitly free the list entry */
+            pfree(pending);
+            /* prev does not change */
+        }
+    }
+}
+
 /*
  *    smgrDoPendingSyncs() -- Take care of relation syncs at end of xact.
  */
@@ -933,6 +1338,15 @@ smgr_redo(XLogReaderState *record)
         reln = smgropen(xlrec->rnode, InvalidBackendId);
         smgrcreate(reln, xlrec->forkNum, true);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+        SMgrRelation reln;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        smgrunlink(reln, xlrec->forkNum, true);
+        smgrclose(reln);
+    }
     else if (info == XLOG_SMGR_TRUNCATE)
     {
         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1021,6 +1435,124 @@ smgr_redo(XLogReaderState *record)
 
         FreeFakeRelcacheEntry(rel);
     }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingCleanup *pending;
+        bool        created = false;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                smgrcreatemark(reln, xlrec->forkNum, xlrec->mark, true);
+                created = true;
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                smgrunlinkmark(reln, xlrec->forkNum, xlrec->mark, true);
+                break;
+            default:
+                elog(ERROR, "unknown smgr_mark action \"%c\"", xlrec->mark);
+        }
+
+        if (created)
+        {
+            /* revert mark file operation at abort */
+            pending = (PendingCleanup *)
+                MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+            pending->relnode = xlrec->rnode;
+            pending->op = PCOP_UNLINK_MARK;
+            pending->unlink_forknum = xlrec->forkNum;
+            pending->unlink_mark = xlrec->mark;
+            pending->backend = InvalidBackendId;
+            pending->atCommit = false;
+            pending->nestLevel = GetCurrentTransactionNestLevel();
+            pending->next = pendingCleanups;
+            pendingCleanups = pending;
+        }
+        else
+        {
+            /*
+             * Delete pending action for this mark file if any. We should have
+             * at most one entry for this action.
+             */
+            PendingCleanup *prev = NULL;
+
+            for (pending = pendingCleanups; pending != NULL;
+                 pending = pending->next)
+            {
+                if (RelFileNodeEquals(xlrec->rnode, pending->relnode) &&
+                    pending->unlink_forknum == xlrec->forkNum &&
+                    (pending->op & PCOP_UNLINK_MARK) != 0)
+                {
+                    if (prev)
+                        prev->next = pending->next;
+                    else
+                        pendingCleanups = pending->next;
+
+                    pfree(pending);
+                    break;
+                }
+
+                prev = pending;
+            }
+        }
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec =
+            (xl_smgr_bufpersistence *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingCleanup *pending;
+        PendingCleanup *prev = NULL;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+        /*
+         * Delete pending action for persistence change if any. We should have
+         * at most one entry for this action.
+         */
+        for (pending = pendingCleanups; pending != NULL;
+             pending = pending->next)
+        {
+            if (RelFileNodeEquals(xlrec->rnode, pending->relnode) &&
+                (pending->op & PCOP_SET_PERSISTENCE) != 0)
+            {
+                Assert (pending->bufpersistence == xlrec->persistence);
+
+                if (prev)
+                    prev->next = pending->next;
+                else
+                    pendingCleanups = pending->next;
+
+                pfree(pending);
+                break;
+            }
+
+            prev = pending;
+        }
+
+        /*
+         * Revert buffer-persistence changes at abort if the relation is going
+         * to different persistence from before this transaction.
+         */
+        if (!pending)
+        {
+            pending = (PendingCleanup *)
+                MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+            pending->relnode = xlrec->rnode;
+            pending->op = PCOP_SET_PERSISTENCE;
+            pending->bufpersistence = !xlrec->persistence;
+            pending->backend = InvalidBackendId;
+            pending->atCommit = false;
+            pending->nestLevel = GetCurrentTransactionNestLevel();
+            pending->next = pendingCleanups;
+            pendingCleanups = pending;
+        }
+    }
     else
         elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3e83f375b5..9e5b77e94a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -53,6 +53,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5347,6 +5348,187 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
     return newcmd;
 }
 
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+                          LOCKMODE lockmode)
+{
+    Relation     rel;
+    Relation    classRel;
+    HeapTuple    tuple,
+                newtuple;
+    Datum        new_val[Natts_pg_class];
+    bool        new_null[Natts_pg_class],
+                new_repl[Natts_pg_class];
+    int            i;
+    List       *relids;
+    ListCell   *lc_oid;
+
+    Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+    Assert(lockmode == AccessExclusiveLock);
+
+    /*
+     * Under the following condition, we need to call ATRewriteTable, which
+     * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+     */
+    Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+           tab->newvals == NULL && !tab->verify_new_notnull);
+
+    rel = table_open(tab->relid, lockmode);
+
+    Assert(rel->rd_rel->relpersistence != persistence);
+
+    elog(DEBUG1, "perform in-place persistnce change");
+
+    /*
+     * First we collect all relations that we need to change persistence.
+     */
+
+    /* Collect OIDs of indexes and toast relations */
+    relids = RelationGetIndexList(rel);
+    relids = lcons_oid(rel->rd_id, relids);
+
+    /* Add toast relation if any */
+    if (OidIsValid(rel->rd_rel->reltoastrelid))
+    {
+        List    *toastidx;
+        Relation toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+        relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+        toastidx = RelationGetIndexList(toastrel);
+        relids = list_concat(relids, toastidx);
+        pfree(toastidx);
+        table_close(toastrel, NoLock);
+    }
+
+    table_close(rel, NoLock);
+
+    /* Make changes in storage */
+    classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+    foreach (lc_oid, relids)
+    {
+        Oid reloid = lfirst_oid(lc_oid);
+        Relation r = relation_open(reloid, lockmode);
+
+        /*
+         * XXXX: Some access methods do not bear up an in-place persistence
+         * change. Specifically, GiST uses page LSNs to figure out whether a
+         * block has changed, where UNLOGGED GiST indexes use fake LSNs that
+         * are incompatible with real LSNs used for LOGGED ones.
+         *
+         * Maybe if gistGetFakeLSN behaved the same way for permanent and
+         * unlogged indexes, we could skip index rebuild in exchange of some
+         * extra WAL records emitted while it is unlogged.
+         *
+         * Check relam against a positive list so that we take this way for
+         * unknown AMs.
+         */
+        if (r->rd_rel->relkind == RELKIND_INDEX &&
+            /* GiST is excluded */
+            r->rd_rel->relam != BTREE_AM_OID &&
+            r->rd_rel->relam != HASH_AM_OID &&
+            r->rd_rel->relam != GIN_AM_OID &&
+            r->rd_rel->relam != SPGIST_AM_OID &&
+            r->rd_rel->relam != BRIN_AM_OID)
+        {
+            int            reindex_flags;
+            ReindexParams params = {0};
+
+            /* reindex doesn't allow concurrent use of the index */
+            table_close(r, NoLock);
+
+            reindex_flags =
+                REINDEX_REL_SUPPRESS_INDEX_USE |
+                REINDEX_REL_CHECK_CONSTRAINTS;
+
+            /* Set the same persistence with the parent relation. */
+            if (persistence == RELPERSISTENCE_UNLOGGED)
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+            else
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+            reindex_index(reloid, reindex_flags, persistence, ¶ms);
+
+            continue;
+        }
+
+        /* Create or drop init fork */
+        if (persistence == RELPERSISTENCE_UNLOGGED)
+            RelationCreateInitFork(r);
+        else
+            RelationDropInitFork(r);
+
+        /*
+         * When this relation gets WAL-logged, immediately sync all files but
+         * initfork to establish the initial state on storage.  Buffers have
+         * already flushed out by RelationCreate(Drop)InitFork called just
+         * above. Initfork should have been synced as needed.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT)
+        {
+            for (i = 0 ; i < INIT_FORKNUM ; i++)
+            {
+                if (smgrexists(RelationGetSmgr(r), i))
+                    smgrimmedsync(RelationGetSmgr(r), i);
+            }
+        }
+
+        /* Update catalog */
+        tuple = SearchSysCacheCopy1(RELOID,    ObjectIdGetDatum(reloid));
+        if (!HeapTupleIsValid(tuple))
+            elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+        memset(new_val, 0, sizeof(new_val));
+        memset(new_null, false, sizeof(new_null));
+        memset(new_repl, false, sizeof(new_repl));
+
+        new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+        new_null[Anum_pg_class_relpersistence - 1] = false;
+        new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+        newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+                                     new_val, new_null, new_repl);
+
+        CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+        heap_freetuple(newtuple);
+
+        /*
+         * While wal_level >= replica, switching to LOGGED requires the
+         * relation content to be WAL-logged to recover the table.
+         * We don't emit this fhile wal_level = minimal.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+        {
+            ForkNumber fork;
+            xl_smgr_truncate xlrec;
+
+            xlrec.blkno = 0;
+            xlrec.rnode = r->rd_node;
+            xlrec.flags = SMGR_TRUNCATE_ALL;
+
+            XLogBeginInsert();
+            XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+            XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+            for (fork = 0; fork < INIT_FORKNUM ; fork++)
+            {
+                if (smgrexists(RelationGetSmgr(r), fork))
+                    log_newpage_range(r, fork, 0,
+                                      smgrnblocks(RelationGetSmgr(r), fork),
+                                      false);
+            }
+        }
+
+        table_close(r, NoLock);
+    }
+
+    table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5477,47 +5659,55 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
                                          tab->relid,
                                          tab->rewrite);
 
-            /*
-             * Create transient table that will receive the modified data.
-             *
-             * Ensure it is marked correctly as logged or unlogged.  We have
-             * to do this here so that buffers for the new relfilenode will
-             * have the right persistence set, and at the same time ensure
-             * that the original filenode's buffers will get read in with the
-             * correct setting (i.e. the original one).  Otherwise a rollback
-             * after the rewrite would possibly result with buffers for the
-             * original filenode having the wrong persistence setting.
-             *
-             * NB: This relies on swap_relation_files() also swapping the
-             * persistence. That wouldn't work for pg_class, but that can't be
-             * unlogged anyway.
-             */
-            OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-                                       persistence, lockmode);
+            if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+                RelationChangePersistence(tab, persistence, lockmode);
+            else
+            {
+                /*
+                 * Create transient table that will receive the modified data.
+                 *
+                 * Ensure it is marked correctly as logged or unlogged.  We
+                 * have to do this here so that buffers for the new relfilenode
+                 * will have the right persistence set, and at the same time
+                 * ensure that the original filenode's buffers will get read in
+                 * with the correct setting (i.e. the original one).  Otherwise
+                 * a rollback after the rewrite would possibly result with
+                 * buffers for the original filenode having the wrong
+                 * persistence setting.
+                 *
+                 * NB: This relies on swap_relation_files() also swapping the
+                 * persistence. That wouldn't work for pg_class, but that can't
+                 * be unlogged anyway.
+                 */
+                OIDNewHeap = make_new_heap(tab->relid, NewTableSpace,
+                                           NewAccessMethod,
+                                           persistence, lockmode);
 
-            /*
-             * Copy the heap data into the new table with the desired
-             * modifications, and test the current data within the table
-             * against new constraints generated by ALTER TABLE commands.
-             */
-            ATRewriteTable(tab, OIDNewHeap, lockmode);
+                /*
+                 * Copy the heap data into the new table with the desired
+                 * modifications, and test the current data within the table
+                 * against new constraints generated by ALTER TABLE commands.
+                 */
+                ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-            /*
-             * Swap the physical files of the old and new heaps, then rebuild
-             * indexes and discard the old heap.  We can use RecentXmin for
-             * the table's new relfrozenxid because we rewrote all the tuples
-             * in ATRewriteTable, so no older Xid remains in the table.  Also,
-             * we never try to swap toast tables by content, since we have no
-             * interest in letting this code work on system catalogs.
-             */
-            finish_heap_swap(tab->relid, OIDNewHeap,
-                             false, false, true,
-                             !OidIsValid(tab->newTableSpace),
-                             RecentXmin,
-                             ReadNextMultiXactId(),
-                             persistence);
+                /*
+                 * Swap the physical files of the old and new heaps, then
+                 * rebuild indexes and discard the old heap.  We can use
+                 * RecentXmin for the table's new relfrozenxid because we
+                 * rewrote all the tuples in ATRewriteTable, so no older Xid
+                 * remains in the table.  Also, we never try to swap toast
+                 * tables by content, since we have no interest in letting this
+                 * code work on system catalogs.
+                 */
+                finish_heap_swap(tab->relid, OIDNewHeap,
+                                 false, false, true,
+                                 !OidIsValid(tab->newTableSpace),
+                                 RecentXmin,
+                                 ReadNextMultiXactId(),
+                                 persistence);
 
-            InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+                InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+            }
         }
         else
         {
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 0bf28b55d7..17185f4e55 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1209,6 +1209,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
         bool        excludeFound;
         ForkNumber    relForkNum; /* Type of fork if file is a relation */
         int            relOidChars;    /* Chars in filename that are the rel oid */
+        StorageMarks mark;
 
         /* Skip special stuff */
         if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
@@ -1259,7 +1260,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
         /* Exclude all forks for unlogged tables except the init fork */
         if (isDbDir &&
             parse_filename_for_nontemp_relation(de->d_name, &relOidChars,
-                                                &relForkNum))
+                                                &relForkNum, &mark))
         {
             /* Never exclude init forks */
             if (relForkNum != INIT_FORKNUM)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f5459c68f8..6cd010429a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -38,6 +38,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -3155,6 +3156,91 @@ DropRelFileNodeBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
     }
 }
 
+/* ---------------------------------------------------------------------
+ *        SetRelFileNodeBuffersPersistence
+ *
+ *        This function changes the persistence of all buffer pages of a relation
+ *        then writes all dirty pages of the relation out to disk when switching
+ *        to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *        ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *        Generally, the caller should be holding AccessExclusiveLock on the
+ *        target relation to ensure that no other backend is busy dirtying
+ *        more blocks of the relation; the effects can't be expected to last
+ *        after the lock is released.
+ *
+ *        XXX currently it sequentially searches the buffer pool, should be
+ *        changed to more clever ways of searching.  This routine is not
+ *        used in any performance-critical code paths, so it's not worth
+ *        adding additional overhead to normal paths to make it go faster;
+ *        but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+    int            i;
+    RelFileNodeBackend rnode = srel->smgr_rnode;
+
+    Assert (!RelFileNodeBackendIsTemp(rnode));
+
+    if (!isRedo)
+        log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+    for (i = 0; i < NBuffers; i++)
+    {
+        BufferDesc *bufHdr = GetBufferDescriptor(i);
+        uint32        buf_state;
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+            continue;
+
+        ReservePrivateRefCountEntry();
+
+        buf_state = LockBufHdr(bufHdr);
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+        {
+            UnlockBufHdr(bufHdr, buf_state);
+            continue;
+        }
+
+        if (permanent)
+        {
+            /* Init fork is being dropped, drop buffers for it. */
+            if (bufHdr->tag.forkNum == INIT_FORKNUM)
+            {
+                InvalidateBuffer(bufHdr);
+                continue;
+            }
+
+            buf_state |= BM_PERMANENT;
+            pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+            /* we flush this buffer when switching to PERMANENT */
+            if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+            {
+                PinBuffer_Locked(bufHdr);
+                LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                              LW_SHARED);
+                FlushBuffer(bufHdr, srel);
+                LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+                UnpinBuffer(bufHdr, true);
+            }
+            else
+                UnlockBufHdr(bufHdr, buf_state);
+        }
+        else
+        {
+            /* There shouldn't be an init fork */
+            Assert(bufHdr->tag.forkNum != INIT_FORKNUM);
+            UnlockBufHdr(bufHdr, buf_state);
+        }
+    }
+}
+
 /* ---------------------------------------------------------------------
  *        DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 14b77f2861..2fc9f17c28 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -349,8 +349,6 @@ static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
 static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
-static int    fsync_parent_path(const char *fname, int elevel);
-
 
 /*
  * pg_fsync --- do fsync with or without writethrough
@@ -3759,7 +3757,7 @@ fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel)
  * This is aimed at making file operations persistent on disk in case of
  * an OS crash or power failure.
  */
-static int
+int
 fsync_parent_path(const char *fname, int elevel)
 {
     char        parentpath[MAXPGPATH];
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index f053fe0495..f28f55baa6 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,49 @@
 
 #include <unistd.h>
 
+#include "access/xlogrecovery.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
 #include "postmaster/startup.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-                                                  int op);
+                                                  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-                                               int op);
+                                               Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
     Oid            reloid;            /* hash key */
-} unlogged_relation_entry;
+    bool        has_init;        /* has INIT fork */
+    bool        dirty_init;        /* needs to remove INIT fork */
+    bool        dirty_all;        /* needs to remove all forks */
+} relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of the "cleanup" forks.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
+ * whole relation along with the mark file.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that have the "cleanup" and/or the "init" forks.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -72,7 +92,7 @@ ResetUnloggedRelations(int op)
     /*
      * First process unlogged files in pg_default ($PGDATA/base)
      */
-    ResetUnloggedRelationsInTablespaceDir("base", op);
+    ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
     /*
      * Cycle through directories for all non-default tablespaces.
@@ -81,13 +101,19 @@ ResetUnloggedRelations(int op)
 
     while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
     {
+        Oid tspid;
+
         if (strcmp(spc_de->d_name, ".") == 0 ||
             strcmp(spc_de->d_name, "..") == 0)
             continue;
 
         snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
                  spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-        ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+        tspid = atooid(spc_de->d_name);
+
+        Assert(tspid != 0);
+        ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
     }
 
     FreeDir(spc_dir);
@@ -103,7 +129,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+                                      Oid tspid, int op)
 {
     DIR           *ts_dir;
     struct dirent *de;
@@ -130,6 +157,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
     while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
     {
+        Oid dbid;
+
         /*
          * We're only interested in the per-database directories, which have
          * numeric names.  Note that this code will also (properly) ignore "."
@@ -148,7 +177,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
             ereport_startup_progress("resetting unlogged relations (cleanup), elapsed time: %ld.%02d s, current path:
%s",
                                      dbspace_path);
 
-        ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+        dbid = atooid(de->d_name);
+        Assert(dbid != 0);
+
+        ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
     }
 
     FreeDir(ts_dir);
@@ -158,125 +190,227 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+                                   Oid tspid, Oid dbid, int op)
 {
     DIR           *dbspace_dir;
     struct dirent *de;
     char        rm_path[MAXPGPATH * 2];
+    HTAB       *hash;
+    HASHCTL        ctl;
 
     /* Caller must specify at least one operation. */
-    Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+    Assert((op & (UNLOGGED_RELATION_CLEANUP |
+                  UNLOGGED_RELATION_DROP_BUFFER |
+                  UNLOGGED_RELATION_INIT)) != 0);
 
     /*
      * Cleanup is a two-pass operation.  First, we go through and identify all
      * the files with init forks.  Then, we go through again and nuke
      * everything with the same OID except the init fork.
      */
+
+    /*
+     * It's possible that someone could create tons of unlogged relations in
+     * the same database & tablespace, so we'd better use a hash table rather
+     * than an array or linked list to keep track of which files need to be
+     * reset.  Otherwise, this cleanup operation would be O(n^2).
+     */
+    memset(&ctl, 0, sizeof(ctl));
+    ctl.keysize = sizeof(Oid);
+    ctl.entrysize = sizeof(relfile_entry);
+    hash = hash_create("relfilenode cleanup hash",
+                       32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+    /* Collect INIT fork and mark files in the directory. */
+    dbspace_dir = AllocateDir(dbspacedirname);
+    while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+    {
+        int            oidchars;
+        ForkNumber    forkNum;
+        StorageMarks mark;
+
+        /* Skip anything that doesn't look like a relation data file. */
+        if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+                                                 &forkNum, &mark))
+            continue;
+
+        if (forkNum == INIT_FORKNUM || mark == SMGR_MARK_UNCOMMITTED)
+        {
+            Oid                key;
+            relfile_entry  *ent;
+            bool            found;
+
+            /*
+             * Record the relfilenode information. If it has
+             * SMGR_MARK_UNCOMMITTED mark files, the relfilenode is in dirty
+             * state, where clean up is needed.
+             */
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+            if (!found)
+            {
+                ent->has_init = false;
+                ent->dirty_init = false;
+                ent->dirty_all = false;
+            }
+
+            if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_init = true;
+            else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_all = true;
+            else
+            {
+                Assert(forkNum == INIT_FORKNUM);
+                ent->has_init = true;
+            }
+        }
+    }
+
+    /* Done with the first pass. */
+    FreeDir(dbspace_dir);
+
+    /* nothing to do if we don't have init nor cleanup forks */
+    if (hash_get_num_entries(hash) < 1)
+    {
+        hash_destroy(hash);
+        return;
+    }
+
+    if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+    {
+        /*
+         * When we come here after recovery, smgr object for this file might
+         * have been created. In that case we need to drop all buffers then the
+         * smgr object before initializing the unlogged relation.  This is safe
+         * as far as no other backends have accessed the relation before
+         * starting archive recovery.
+         */
+        HASH_SEQ_STATUS status;
+        relfile_entry *ent;
+        SMgrRelation   *srels = palloc(sizeof(SMgrRelation) * 8);
+        int               maxrels = 8;
+        int               nrels = 0;
+        int i;
+
+        Assert(!HotStandbyActive());
+
+        hash_seq_init(&status, hash);
+        while((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
+        {
+            RelFileNodeBackend rel;
+
+            /*
+             * The relation is persistent and stays remain persistent. Don't
+             * drop the buffers for this relation.
+             */
+            if (ent->has_init && ent->dirty_init)
+                continue;
+
+            if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = ent->reloid;
+
+            srels[nrels++] = smgropen(rel.node, InvalidBackendId);
+        }
+
+        DropRelFileNodesAllBuffers(srels, nrels);
+
+        for (i = 0 ; i < nrels ; i++)
+            smgrclose(srels[i]);
+    }
+
+    /*
+     * Now, make a second pass and remove anything that matches.
+     */
     if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
     {
-        HTAB       *hash;
-        HASHCTL        ctl;
-
-        /*
-         * It's possible that someone could create a ton of unlogged relations
-         * in the same database & tablespace, so we'd better use a hash table
-         * rather than an array or linked list to keep track of which files
-         * need to be reset.  Otherwise, this cleanup operation would be
-         * O(n^2).
-         */
-        ctl.keysize = sizeof(Oid);
-        ctl.entrysize = sizeof(unlogged_relation_entry);
-        ctl.hcxt = CurrentMemoryContext;
-        hash = hash_create("unlogged relation OIDs", 32, &ctl,
-                           HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-        /* Scan the directory. */
         dbspace_dir = AllocateDir(dbspacedirname);
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
+            ForkNumber        forkNum;
+            StorageMarks    mark;
+            int                oidchars;
+            Oid                key;
+            relfile_entry  *ent;
+            RelFileNodeBackend rel;
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* Also skip it unless this is the init fork. */
-            if (forkNum != INIT_FORKNUM)
-                continue;
-
-            /*
-             * Put the OID portion of the name into the hash table, if it
-             * isn't already.
-             */
-            ent.reloid = atooid(de->d_name);
-            (void) hash_search(hash, &ent, HASH_ENTER, NULL);
-        }
-
-        /* Done with the first pass. */
-        FreeDir(dbspace_dir);
-
-        /*
-         * If we didn't find any init forks, there's no point in continuing;
-         * we can bail out now.
-         */
-        if (hash_get_num_entries(hash) == 0)
-        {
-            hash_destroy(hash);
-            return;
-        }
-
-        /*
-         * Now, make a second pass and remove anything that matches.
-         */
-        dbspace_dir = AllocateDir(dbspacedirname);
-        while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-        {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
-
-            /* Skip anything that doesn't look like a relation data file. */
-            if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* We never remove the init fork. */
-            if (forkNum == INIT_FORKNUM)
+                                                     &forkNum, &mark))
                 continue;
 
             /*
              * See whether the OID portion of the name shows up in the hash
              * table.  If so, nuke it!
              */
-            ent.reloid = atooid(de->d_name);
-            if (hash_search(hash, &ent, HASH_FIND, NULL))
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+            if (!ent)
+                continue;
+
+            if (!ent->dirty_all)
             {
-                snprintf(rm_path, sizeof(rm_path), "%s/%s",
-                         dbspacedirname, de->d_name);
-                if (unlink(rm_path) < 0)
-                    ereport(ERROR,
-                            (errcode_for_file_access(),
-                             errmsg("could not remove file \"%s\": %m",
-                                    rm_path)));
+                /* clean permanent relations don't need cleanup */
+                if (!ent->has_init)
+                    continue;
+
+                if (ent->dirty_init)
+                {
+                    /*
+                     * The crashed trasaction did SET UNLOGGED. This relation
+                     * is restored to a LOGGED relation.
+                     */
+                    if (forkNum != INIT_FORKNUM)
+                        continue;
+                }
                 else
-                    elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+                {
+                    /*
+                     * we don't remove the INIT fork of a non-dirty
+                     * relfilenode
+                     */
+                    if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+                        continue;
+                }
             }
+
+            /* so, nuke it! */
+            snprintf(rm_path, sizeof(rm_path), "%s/%s",
+                     dbspacedirname, de->d_name);
+            if (unlink(rm_path) < 0)
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m",
+                                rm_path)));
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = atooid(de->d_name);
+
+            ForgetRelationForkSyncRequests(rel, forkNum);
         }
 
         /* Cleanup is complete. */
         FreeDir(dbspace_dir);
-        hash_destroy(hash);
     }
 
+    hash_destroy(hash);
+    hash = NULL;
+
     /*
      * Initialization happens after cleanup is complete: we copy each init
-     * fork file to the corresponding main fork file.  Note that if we are
-     * asked to do both cleanup and init, we may never get here: if the
-     * cleanup code determines that there are no init forks in this dbspace,
-     * it will return before we get to this point.
+     * fork file to the corresponding main fork file.
      */
     if ((op & UNLOGGED_RELATION_INIT) != 0)
     {
@@ -285,6 +419,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        srcpath[MAXPGPATH * 2];
@@ -292,9 +427,11 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -328,15 +465,18 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        mainpath[MAXPGPATH];
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -379,7 +519,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
  */
 bool
 parse_filename_for_nontemp_relation(const char *name, int *oidchars,
-                                    ForkNumber *fork)
+                                    ForkNumber *fork, StorageMarks *mark)
 {
     int            pos;
 
@@ -410,11 +550,19 @@ parse_filename_for_nontemp_relation(const char *name, int *oidchars,
 
         for (segchar = 1; isdigit((unsigned char) name[pos + segchar]); ++segchar)
             ;
-        if (segchar <= 1)
-            return false;
-        pos += segchar;
+        if (segchar > 1)
+            pos += segchar;
     }
 
+    /* mark file? */
+    if (name[pos] == '.' && name[pos + 1] != 0)
+    {
+        *mark = name[pos + 1];
+        pos += 2;
+    }
+    else
+        *mark = SMGR_MARK_NONE;
+
     /* Now we should be at the end. */
     if (name[pos] != '\0')
         return false;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 879f647dbc..4d44bdd78b 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -139,7 +139,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
                              BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
                               MdfdVec *seg);
-
+static bool mdmarkexists(SMgrRelation reln, ForkNumber forkNum,
+                         StorageMarks mark);
 
 /*
  *    mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -169,6 +170,82 @@ mdexists(SMgrRelation reln, ForkNumber forkNum)
     return (mdopenfork(reln, forkNum, EXTENSION_RETURN_NULL) != NULL);
 }
 
+/*
+ *  mdcreatemark() -- Create a mark file.
+ *
+ * If isRedo is true, it's okay for the file to exist already.
+ */
+void
+mdcreatemark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path =markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    /* See mdcreate for details.. */
+    TablespaceCreateDbspace(reln->smgr_rnode.node.spcNode,
+                            reln->smgr_rnode.node.dbNode,
+                            isRedo);
+
+    fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL);
+    if (fd < 0 && (!isRedo || errno != EEXIST))
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not crete mark file \"%s\": %m", path)));
+
+    pg_fsync(fd);
+    close(fd);
+
+    /*
+     * To guarantee that the creation of the file is persistent, fsync its
+     * parent directory.
+     */
+    fsync_parent_path(path, ERROR);
+
+    pfree(path);
+}
+
+
+/*
+ *  mdunlinkmark()  -- Delete the mark file
+ *
+ * If isRedo is true, it's okay for the file being not found.
+ */
+void
+mdunlinkmark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+
+    if (!isRedo || mdmarkexists(reln, forkNum, mark))
+        durable_unlink(path, ERROR);
+
+    pfree(path);
+}
+
+/*
+ *  mdmarkexists()  -- Check if the file exists.
+ */
+static bool
+mdmarkexists(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    fd = BasicOpenFile(path, O_RDONLY);
+    if (fd < 0 && errno != ENOENT)
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not access mark file \"%s\": %m", path)));
+    pfree(path);
+
+    if (fd < 0)
+        return false;
+
+    close(fd);
+    return true;
+}
+
 /*
  *    mdcreate() -- Create a new relation on magnetic disk.
  *
@@ -1031,6 +1108,15 @@ register_forget_request(RelFileNodeBackend rnode, ForkNumber forknum,
     RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileNodeBackend rnode, ForkNumber forknum)
+{
+    register_forget_request(rnode, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
@@ -1384,12 +1470,14 @@ mdsyncfiletag(const FileTag *ftag, char *path)
  * Return 0 on success, -1 on failure, with errno set.
  */
 int
-mdunlinkfiletag(const FileTag *ftag, char *path)
+mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark)
 {
     char       *p;
 
     /* Compute the path. */
-    p = relpathperm(ftag->rnode, MAIN_FORKNUM);
+    p = GetRelationPath(ftag->rnode.dbNode, ftag->rnode.spcNode,
+                        ftag->rnode.relNode, InvalidBackendId, MAIN_FORKNUM,
+                        mark);
     strlcpy(path, p, MAXPGPATH);
     pfree(p);
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index d71a557a35..0710e8b145 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -63,6 +63,10 @@ typedef struct f_smgr
     void        (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
                                   BlockNumber nblocks);
     void        (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+    void        (*smgr_createmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
+    void        (*smgr_unlinkmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -84,6 +88,8 @@ static const f_smgr smgrsw[] = {
         .smgr_nblocks = mdnblocks,
         .smgr_truncate = mdtruncate,
         .smgr_immedsync = mdimmedsync,
+        .smgr_createmark = mdcreatemark,
+        .smgr_unlinkmark = mdunlinkmark,
     }
 };
 
@@ -337,6 +343,26 @@ smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
     smgrsw[reln->smgr_which].smgr_create(reln, forknum, isRedo);
 }
 
+/*
+ *    smgrcreatemark() -- Create a mark file
+ */
+void
+smgrcreatemark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_createmark(reln, forknum, mark, isRedo);
+}
+
+/*
+ *    smgrunlinkmark() -- Delete a mark file
+ */
+void
+smgrunlinkmark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlinkmark(reln, forknum, mark, isRedo);
+}
+
 /*
  *    smgrdosyncall() -- Immediately sync all forks of all given relations
  *
@@ -664,6 +690,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
     smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index e161d57761..f5ded7cb34 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -90,7 +90,8 @@ static CycleCtr checkpoint_cycle_ctr = 0;
 typedef struct SyncOps
 {
     int            (*sync_syncfiletag) (const FileTag *ftag, char *path);
-    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path);
+    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path,
+                                       StorageMarks mark);
     bool        (*sync_filetagmatches) (const FileTag *ftag,
                                         const FileTag *candidate);
 } SyncOps;
@@ -223,7 +224,8 @@ SyncPostCheckpoint(void)
 
         /* Unlink the file */
         if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-                                                          path) < 0)
+                                                          path,
+                                                          SMGR_MARK_NONE) < 0)
         {
             /*
              * There's a race condition, when the database is dropped at the
@@ -237,6 +239,20 @@ SyncPostCheckpoint(void)
                         (errcode_for_file_access(),
                          errmsg("could not remove file \"%s\": %m", path)));
         }
+        else if (syncsw[entry->tag.handler].sync_unlinkfiletag(
+                     &entry->tag, path,
+                     SMGR_MARK_UNCOMMITTED) < 0)
+        {
+            /*
+             * And we may have SMGR_MARK_UNCOMMITTED file.  Remove it if the
+             * fork files has been successfully removed. It's ok if the file
+             * does not exist.
+             */
+            if (errno != ENOENT)
+                ereport(WARNING,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m", path)));
+        }
 
         /* Mark the list entry as canceled, just in case */
         entry->canceled = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 56df08c64f..f1382d4c4f 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -407,6 +407,28 @@ extractPageInfo(XLogReaderState *record)
          * source system.
          */
     }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_UNLINK)
+    {
+        /*
+         * We can safely ignore there.  We'll see that the file don't exist in
+         * the target data dir, and copy them in from the source system. No
+         * need to do anything special here.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_MARK)
+    {
+        /*
+         * We can safely ignore these, The file will be removed from the
+         * target, if it doesn't exist in the source system.  The files are
+         * empty so we don't need to bother the content.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        /*
+         * We can safely ignore these. These don't make any on-disk changes.
+         */
+    }
     else if (rmid == RM_XACT_ID &&
              ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
               (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 636c96efd3..1c19e16fea 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -139,9 +139,15 @@ GetDatabasePath(Oid dbNode, Oid spcNode)
  */
 char *
 GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                int backendId, ForkNumber forkNumber)
+                int backendId, ForkNumber forkNumber, char mark)
 {
     char       *path;
+    char        markstr[4];
+
+    if (mark == 0)
+        markstr[0] = 0;
+    else
+        snprintf(markstr, sizeof(markstr), ".%c", mark);
 
     if (spcNode == GLOBALTABLESPACE_OID)
     {
@@ -149,10 +155,10 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         Assert(dbNode == 0);
         Assert(backendId == InvalidBackendId);
         if (forkNumber != MAIN_FORKNUM)
-            path = psprintf("global/%u_%s",
-                            relNode, forkNames[forkNumber]);
+            path = psprintf("global/%u_%s%s",
+                            relNode, forkNames[forkNumber], markstr);
         else
-            path = psprintf("global/%u", relNode);
+            path = psprintf("global/%u%s", relNode, markstr);
     }
     else if (spcNode == DEFAULTTABLESPACE_OID)
     {
@@ -160,22 +166,22 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/%u_%s",
+                path = psprintf("base/%u/%u_%s%s",
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/%u",
-                                dbNode, relNode);
+                path = psprintf("base/%u/%u%s",
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/t%d_%u_%s",
+                path = psprintf("base/%u/t%d_%u_%s%s",
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/t%d_%u",
-                                dbNode, backendId, relNode);
+                path = psprintf("base/%u/t%d_%u%s",
+                                dbNode, backendId, relNode, markstr);
         }
     }
     else
@@ -184,27 +190,28 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, relNode);
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, backendId, relNode);
+                                dbNode, backendId, relNode, markstr);
         }
     }
+
     return path;
 }
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 9ffc741913..d362d62ed2 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int    wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
@@ -41,6 +43,7 @@ extern void RestorePendingSyncs(char *startAddress);
 extern void smgrDoPendingDeletes(bool isCommit);
 extern void smgrDoPendingSyncs(bool isCommit, bool isParallelWorker);
 extern int    smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern void smgrDoPendingCleanups(bool isCommit);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 622de22b03..8139308634 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -18,17 +18,23 @@
 #include "lib/stringinfo.h"
 #include "storage/block.h"
 #include "storage/relfilenode.h"
+#include "storage/smgr.h"
 
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation and buffer persistence change here,
+ * but logging of deletion actions is handled mainly by xact.c, because it is
+ * part of transaction commit in most cases.  However, there's a case where
+ * init forks are deleted outside control of transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE    0x10
 #define XLOG_SMGR_TRUNCATE    0x20
+#define XLOG_SMGR_UNLINK    0x30
+#define XLOG_SMGR_MARK        0x40
+#define XLOG_SMGR_BUFPERSISTENCE    0x50
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +42,32 @@ typedef struct xl_smgr_create
     ForkNumber    forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+    RelFileNode rnode;
+    ForkNumber    forkNum;
+} xl_smgr_unlink;
+
+typedef enum smgr_mark_action
+{
+    XLOG_SMGR_MARK_CREATE = 'c',
+    XLOG_SMGR_MARK_UNLINK = 'u'
+} smgr_mark_action;
+
+typedef struct xl_smgr_mark
+{
+    RelFileNode     rnode;
+    ForkNumber        forkNum;
+    StorageMarks    mark;
+    smgr_mark_action action;
+} xl_smgr_mark;
+
+typedef struct xl_smgr_bufpersistence
+{
+    RelFileNode rnode;
+    bool        persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP        0x0001
 #define SMGR_TRUNCATE_VM        0x0002
@@ -51,6 +83,12 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a4b5dc853b..a864c91614 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -67,7 +67,7 @@ extern int    forkname_chars(const char *str, ForkNumber *fork);
 extern char *GetDatabasePath(Oid dbNode, Oid spcNode);
 
 extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                             int backendId, ForkNumber forkNumber);
+                             int backendId, ForkNumber forkNumber, char mark);
 
 /*
  * Wrapper macros for GetRelationPath.  Beware of multiple
@@ -77,7 +77,7 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 /* First argument is a RelFileNode */
 #define relpathbackend(rnode, backend, forknum) \
     GetRelationPath((rnode).dbNode, (rnode).spcNode, (rnode).relNode, \
-                    backend, forknum)
+                    backend, forknum, 0)
 
 /* First argument is a RelFileNode */
 #define relpathperm(rnode, forknum) \
@@ -87,4 +87,9 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 #define relpath(rnode, forknum) \
     relpathbackend((rnode).node, (rnode).backend, forknum)
 
+/* First argument is a RelFileNodeBackend */
+#define markpath(rnode, forknum, mark)                                \
+    GetRelationPath((rnode).node.dbNode, (rnode).node.spcNode, \
+                    (rnode).node.relNode, \
+                    (rnode).backend, forknum, mark)
 #endif                            /* RELPATH_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index dd01841c30..739b386216 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -206,6 +206,8 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
                                    int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+                                          bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(struct SMgrRelationData **smgr_reln, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 29209e2724..8bf746bf45 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -185,6 +185,7 @@ extern ssize_t pg_pwritev_with_retry(int fd,
 extern int    pg_truncate(const char *path, off_t length);
 extern void fsync_fname(const char *fname, bool isdir);
 extern int    fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
+extern int    fsync_parent_path(const char *fname, int elevel);
 extern int    durable_rename(const char *oldfile, const char *newfile, int loglevel);
 extern int    durable_unlink(const char *fname, int loglevel);
 extern int    durable_rename_excl(const char *oldfile, const char *newfile, int loglevel);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 6e46d8d96a..ef5fdaf4f8 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -24,6 +24,10 @@ extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
 extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdrelease(void);
+extern void mdcreatemark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool isRedo);
+extern void mdunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool    isRedo);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
@@ -42,12 +46,14 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
                        BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileNodeBackend rnode,
+                                           ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
 extern int    mdsyncfiletag(const FileTag *ftag, char *path);
-extern int    mdunlinkfiletag(const FileTag *ftag, char *path);
+extern int    mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark);
 extern bool mdfiletagmatches(const FileTag *ftag, const FileTag *candidate);
 
 #endif                            /* MD_H */
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index bf2c10d443..e399aec0c7 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -16,13 +16,15 @@
 #define REINIT_H
 
 #include "common/relpath.h"
-
+#include "storage/smgr.h"
 
 extern void ResetUnloggedRelations(int op);
-extern bool parse_filename_for_nontemp_relation(const char *name,
-                                                int *oidchars, ForkNumber *fork);
+extern bool parse_filename_for_nontemp_relation(const char *name, int *oidchars,
+                                                ForkNumber *fork,
+                                                StorageMarks *mark);
 
 #define UNLOGGED_RELATION_CLEANUP        0x0001
-#define UNLOGGED_RELATION_INIT            0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER    0x0002
+#define UNLOGGED_RELATION_INIT            0x0004
 
 #endif                            /* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 8e3ef92cda..022654b7b2 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -18,6 +18,18 @@
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
+/*
+ * Storage marks is a file of which existence suggests something about a
+ * file. The name of such files is "<filename>.<mark>", where the mark is one
+ * of the values of StorageMarks. Since ".<digit>" means segment files so don't
+ * use digits for the mark character.
+ */
+typedef enum StorageMarks
+{
+    SMGR_MARK_NONE = 0,
+    SMGR_MARK_UNCOMMITTED = 'u'    /* the file is not committed yet */
+} StorageMarks;
+
 /*
  * smgr.c maintains a table of SMgrRelation objects, which are essentially
  * cached file handles.  An SMgrRelation is created (if not already present)
@@ -85,7 +97,12 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
+extern void smgrcreatemark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
+extern void smgrunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-- 
2.27.0

From d7caa6b33f364ad1a88a8f74306a255e607a6639 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 23:21:09 +0900
Subject: [PATCH v18 2/2] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 doc/src/sgml/ref/alter_table.sgml        |  15 +++
 src/backend/commands/tablecmds.c         | 140 +++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c            |  16 +++
 src/backend/nodes/equalfuncs.c           |  15 +++
 src/backend/parser/gram.y                |  42 +++++++
 src/backend/tcop/utility.c               |  11 ++
 src/include/commands/tablecmds.h         |   2 +
 src/include/nodes/nodes.h                |   1 +
 src/include/nodes/parsenodes.h           |  10 ++
 src/test/regress/expected/tablespace.out |  76 ++++++++++++
 src/test/regress/sql/tablespace.sql      |  41 +++++++
 11 files changed, 369 insertions(+)

diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index 5c0735e08a..b03d5511a6 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -33,6 +33,8 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
 ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable
class="parameter">role_name</replaceable>[, ... ] ]
 
     SET TABLESPACE <replaceable class="parameter">new_tablespace</replaceable> [ NOWAIT ]
+ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable
class="parameter">role_name</replaceable>[, ... ] ]
 
+    SET { LOGGED | UNLOGGED } [ NOWAIT ]
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ATTACH PARTITION <replaceable class="parameter">partition_name</replaceable> { FOR VALUES <replaceable
class="parameter">partition_bound_spec</replaceable>| DEFAULT }
 
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
@@ -753,6 +755,19 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       (see <xref linkend="sql-createtable-unlogged"/>).  It cannot be applied
       to a temporary table.
      </para>
+
+     <para>
+      All tables in the current database in a tablespace can be changed by using
+      the <literal>ALL IN TABLESPACE</literal> form, which will lock all tables
+      to be changed first and then change each one.  This form also supports
+      <literal>OWNED BY</literal>, which will only change tables owned by the
+      roles specified.  If the <literal>NOWAIT</literal> option is specified
+      then the command will fail if it is unable to acquire all of the locks
+      required immediately.  The <literal>information_schema</literal>
+      relations are not considered part of the system catalogs and will be
+      changed.  See also
+      <link linkend="sql-createtablespace"><command>CREATE TABLESPACE</command></link>.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 9e5b77e94a..0724d0e1d2 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14770,6 +14770,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
     return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistene only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt)
+{
+    List       *relations = NIL;
+    ListCell   *l;
+    ScanKeyData key[1];
+    Relation    rel;
+    TableScanDesc scan;
+    HeapTuple    tuple;
+    Oid            tablespaceoid;
+    List       *role_oids = roleSpecsToIds(stmt->roles);
+
+    /* Ensure we were not asked to change something we can't */
+    if (stmt->objtype != OBJECT_TABLE)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only tables can be specified")));
+
+    /* Get the tablespace OID */
+    tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+    /*
+     * Now that the checks are done, check if we should set either to
+     * InvalidOid because it is our database's default tablespace.
+     */
+    if (tablespaceoid == MyDatabaseTableSpace)
+        tablespaceoid = InvalidOid;
+
+    /*
+     * Walk the list of objects in the tablespace to pick up them. This will
+     * only find objects in our database, of course.
+     */
+    ScanKeyInit(&key[0],
+                Anum_pg_class_reltablespace,
+                BTEqualStrategyNumber, F_OIDEQ,
+                ObjectIdGetDatum(tablespaceoid));
+
+    rel = table_open(RelationRelationId, AccessShareLock);
+    scan = table_beginscan_catalog(rel, 1, key);
+    while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+    {
+        Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+        Oid            relOid = relForm->oid;
+
+        /*
+         * Do not pick-up objects in pg_catalog as part of this, if an admin
+         * really wishes to do so, they can issue the individual ALTER
+         * commands directly.
+         *
+         * Also, explicitly avoid any shared tables, temp tables, or TOAST
+         * (TOAST will be changed with the main table).
+         */
+        if (IsCatalogNamespace(relForm->relnamespace) ||
+            relForm->relisshared ||
+            isAnyTempNamespace(relForm->relnamespace) ||
+            IsToastNamespace(relForm->relnamespace))
+            continue;
+
+        /* Only pick up the object type requested */
+        if (relForm->relkind != RELKIND_RELATION)
+            continue;
+
+        /* Check if we are only picking-up objects owned by certain roles */
+        if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+            continue;
+
+        /*
+         * Handle permissions-checking here since we are locking the tables
+         * and also to avoid doing a bunch of work only to fail part-way. Note
+         * that permissions will also be checked by AlterTableInternal().
+         *
+         * Caller must be considered an owner on the table of which we're going
+         * to change persistence.
+         */
+        if (!pg_class_ownercheck(relOid, GetUserId()))
+            aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+                           NameStr(relForm->relname));
+
+        if (stmt->nowait &&
+            !ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+            ereport(ERROR,
+                    (errcode(ERRCODE_OBJECT_IN_USE),
+                     errmsg("aborting because lock on relation \"%s.%s\" is not available",
+                            get_namespace_name(relForm->relnamespace),
+                            NameStr(relForm->relname))));
+        else
+            LockRelationOid(relOid, AccessExclusiveLock);
+
+        /*
+         * Add to our list of objects of which we're going to change
+         * persistence.
+         */
+        relations = lappend_oid(relations, relOid);
+    }
+
+    table_endscan(scan);
+    table_close(rel, AccessShareLock);
+
+    if (relations == NIL)
+        ereport(NOTICE,
+                (errcode(ERRCODE_NO_DATA_FOUND),
+                 errmsg("no matching relations in tablespace \"%s\" found",
+                        tablespaceoid == InvalidOid ? "(database default)" :
+                        get_tablespace_name(tablespaceoid))));
+
+    /*
+     * Everything is locked, loop through and change persistence of all of the
+     * relations.
+     */
+    foreach(l, relations)
+    {
+        List       *cmds = NIL;
+        AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+        if (stmt->logged)
+            cmd->subtype = AT_SetLogged;
+        else
+            cmd->subtype = AT_SetUnLogged;
+
+        cmds = lappend(cmds, cmd);
+
+        EventTriggerAlterTableStart((Node *) stmt);
+        /* OID is set by AlterTableInternal */
+        AlterTableInternal(lfirst_oid(l), cmds, false);
+        EventTriggerAlterTableEnd();
+    }
+}
+
 static void
 index_copy_data(Relation rel, RelFileNode newrnode)
 {
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index d4f8455a2b..ba605405a9 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4285,6 +4285,19 @@ _copyAlterTableMoveAllStmt(const AlterTableMoveAllStmt *from)
     return newnode;
 }
 
+static AlterTableSetLoggedAllStmt *
+_copyAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *from)
+{
+    AlterTableSetLoggedAllStmt *newnode = makeNode(AlterTableSetLoggedAllStmt);
+
+    COPY_STRING_FIELD(tablespacename);
+    COPY_SCALAR_FIELD(objtype);
+    COPY_SCALAR_FIELD(logged);
+    COPY_SCALAR_FIELD(nowait);
+
+    return newnode;
+}
+
 static CreateExtensionStmt *
 _copyCreateExtensionStmt(const CreateExtensionStmt *from)
 {
@@ -5655,6 +5668,9 @@ copyObjectImpl(const void *from)
         case T_AlterTableMoveAllStmt:
             retval = _copyAlterTableMoveAllStmt(from);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _copyAlterTableSetLoggedAllStmt(from);
+            break;
         case T_CreateExtensionStmt:
             retval = _copyCreateExtensionStmt(from);
             break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index f1002afe7a..b76fc872a5 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1925,6 +1925,18 @@ _equalAlterTableMoveAllStmt(const AlterTableMoveAllStmt *a,
     return true;
 }
 
+static bool
+_equalAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *a,
+                                 const AlterTableSetLoggedAllStmt *b)
+{
+    COMPARE_STRING_FIELD(tablespacename);
+    COMPARE_SCALAR_FIELD(objtype);
+    COMPARE_SCALAR_FIELD(logged);
+    COMPARE_SCALAR_FIELD(nowait);
+
+    return true;
+}
+
 static bool
 _equalCreateExtensionStmt(const CreateExtensionStmt *a, const CreateExtensionStmt *b)
 {
@@ -3650,6 +3662,9 @@ equal(const void *a, const void *b)
         case T_AlterTableMoveAllStmt:
             retval = _equalAlterTableMoveAllStmt(a, b);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _equalAlterTableSetLoggedAllStmt(a, b);
+            break;
         case T_CreateExtensionStmt:
             retval = _equalCreateExtensionStmt(a, b);
             break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a03b33b53b..f8a41de2dd 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -1985,6 +1985,48 @@ AlterTableStmt:
                     n->nowait = $13;
                     $$ = (Node *)n;
                 }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = true;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->roles = $9;
+                    n->logged = true;
+                    n->nowait = $12;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = false;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->roles = $9;
+                    n->logged = false;
+                    n->nowait = $12;
+                    $$ = (Node *)n;
+                }
         |    ALTER INDEX qualified_name alter_table_cmds
                 {
                     AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 3780c6e812..80d1e360b3 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -163,6 +163,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
         case T_AlterTSConfigurationStmt:
         case T_AlterTSDictionaryStmt:
         case T_AlterTableMoveAllStmt:
+        case T_AlterTableSetLoggedAllStmt:
         case T_AlterTableSpaceOptionsStmt:
         case T_AlterTableStmt:
         case T_AlterTypeStmt:
@@ -1753,6 +1754,12 @@ ProcessUtilitySlow(ParseState *pstate,
                 commandCollected = true;
                 break;
 
+            case T_AlterTableSetLoggedAllStmt:
+                AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+                /* commands are stashed in AlterTableSetLoggedAll */
+                commandCollected = true;
+                break;
+
             case T_DropStmt:
                 ExecDropStmt((DropStmt *) parsetree, isTopLevel);
                 /* no commands stashed for DROP */
@@ -2675,6 +2682,10 @@ CreateCommandTag(Node *parsetree)
             tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
             break;
 
+        case T_AlterTableSetLoggedAllStmt:
+            tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+            break;
+
         case T_AlterTableStmt:
             tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
             break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 5d4037f26e..c381dad3e5 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid    AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
                                          Oid *oldschema);
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 5d075f0c34..d8e1f223c8 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -430,6 +430,7 @@ typedef enum NodeTag
     T_AlterCollationStmt,
     T_CallStmt,
     T_AlterStatsStmt,
+    T_AlterTableSetLoggedAllStmt,
 
     /*
      * TAGS FOR PARSE TREE NODES (parsenodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1617702d9d..4fa9d9360f 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2352,6 +2352,16 @@ typedef struct AlterTableMoveAllStmt
     bool        nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+    NodeTag        type;
+    char       *tablespacename;
+    ObjectType    objtype;        /* Object type to move */
+    List       *roles;            /* List of roles to change objects of */
+    bool        logged;
+    bool        nowait;
+} AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *        Create/Alter Extension Statements
  * ----------------------
diff --git a/src/test/regress/expected/tablespace.out b/src/test/regress/expected/tablespace.out
index 2dfbcfdebe..c02afdcb68 100644
--- a/src/test/regress/expected/tablespace.out
+++ b/src/test/regress/expected/tablespace.out
@@ -943,5 +943,81 @@ drop cascades to table testschema.asexecute
 drop cascades to table testschema.part
 drop cascades to table testschema.atable
 drop cascades to table testschema.tablespace_acl
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+      OWNED BY regress_tablespace_user1 SET LOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | p
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+RESET ROLE;
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | u
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | u
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+NOTICE:  drop cascades to 8 other objects
+DETAIL:  drop cascades to table testschema.lsu
+drop cascades to table testschema.usu
+drop cascades to table testschema._lsu
+drop cascades to table testschema._usu
+drop cascades to table testschema.lu1
+drop cascades to table testschema.uu1
+drop cascades to table testschema._lu1
+drop cascades to table testschema._uu1
+DROP TABLESPACE regress_tablespace;
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
diff --git a/src/test/regress/sql/tablespace.sql b/src/test/regress/sql/tablespace.sql
index 896f05cea3..4e407eb8c0 100644
--- a/src/test/regress/sql/tablespace.sql
+++ b/src/test/regress/sql/tablespace.sql
@@ -419,5 +419,46 @@ DROP TABLESPACE regress_tblspace_renamed;
 
 DROP SCHEMA testschema CASCADE;
 
+
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+      OWNED BY regress_tablespace_user1 SET LOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+RESET ROLE;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+DROP TABLESPACE regress_tablespace;
+
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
-- 
2.27.0


pgsql-hackers by date:

Previous
From: Ajin Cherian
Date:
Subject: Re: logical replication empty transactions
Next
From: Bharath Rupireddy
Date:
Subject: Re: Allow async standbys wait for sync replication (was: Disallow quorum uncommitted (with synchronous standbys) txns in logical replication subscribers)