From 8b4769d8720eeb15402231fcbc4738e2e651da19 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 14 Feb 2022 17:48:03 +0530 Subject: [PATCH v9 6/6] WAL logged CREATE DATABASE Currently, CREATE DATABASE forces a checkpoint, then copies all the files, then forces another checkpoint. The comments in the createdb() function explain the reasons for this. The attached patch fixes this problem by making create database completely WAL logged so that we can avoid the checkpoints. We are also maintaining the old way of creating the database and for that we are providing an option called log_copied_blocks. If log_copied_blocks is passed true then it will create database using new method otherwise old. The default will be new method. --- doc/src/sgml/ref/create_database.sgml | 22 + src/backend/commands/dbcommands.c | 840 +++++++++++++++++++++++++++------ src/include/commands/dbcommands_xlog.h | 8 + src/tools/pgindent/typedefs.list | 1 + 4 files changed, 739 insertions(+), 132 deletions(-) diff --git a/doc/src/sgml/ref/create_database.sgml b/doc/src/sgml/ref/create_database.sgml index f70d0c7..7c7bc0e 100644 --- a/doc/src/sgml/ref/create_database.sgml +++ b/doc/src/sgml/ref/create_database.sgml @@ -34,6 +34,7 @@ CREATE DATABASE name [ CONNECTION LIMIT [=] connlimit ] [ IS_TEMPLATE [=] istemplate ] [ OID [=] oid ] ] + [ STRATEGY [=] strategy ] ] @@ -240,6 +241,27 @@ CREATE DATABASE name + + strategy + + + This is used for copying the database directory. Currently, we have + two strategies the WAL_LOG_BLOCK and the + FILE_COPY. If WAL_LOG_BLOCK + strategy is used then the database will be copied block by block and it + will also WAL log each copied block. Otherwise, if FILE_COPY + strategy is used then it will do the file system level copy + so individual the block is not WAL logged. If the FILE_COPY + strategy is used then it has to issue a checkpoint before + and after performing the copy and if the shared buffers are large and + there are a lot of dirty buffers then issuing checkpoint would be + costly and it may impact the performance of the whole system. On the + other hand, if we WAL log each block then if the source database is + large then creating the database may take more time. + + + + diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index c37e3c9..cf5f475 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -46,6 +46,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/defrem.h" #include "commands/seclabel.h" +#include "commands/tablecmds.h" #include "commands/tablespace.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -63,13 +64,27 @@ #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/pg_locale.h" +#include "utils/relmapper.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +/* + * Create database strategy. The CREATEDB_WAL_LOG will copy the database at + * the block level and WAL log each copied block. Whereas the + * CREATEDB_FILE_COPY will directly copy the database at the file level and no + * individual operations will be WAL logged. + */ +typedef enum CreateDBStrategy +{ + CREATEDB_WAL_LOG = 0, + CREATEDB_FILE_COPY = 1 +} CreateDBStrategy; + typedef struct { Oid src_dboid; /* source (template) DB */ Oid dest_dboid; /* DB we are trying to create */ + CreateDBStrategy strategy; /* create db strategy */ } createdb_failure_params; typedef struct @@ -78,6 +93,19 @@ typedef struct Oid dest_tsoid; /* tablespace we are trying to move to */ } movedb_failure_params; +/* + * When creating a database, we scan the pg_class of the source database to + * identify all the relations to be copied. The structure is used for storing + * information about each relation of the source database. + */ +typedef struct CreateDBRelInfo +{ + RelFileNode rnode; /* physical relation identifier */ + Oid reloid; /* relation oid */ + char relpersistence; /* relation's persistence level */ +} CreateDBRelInfo; + + /* non-export function prototypes */ static void createdb_failure_callback(int code, Datum arg); static void movedb(const char *dbname, const char *tblspcname); @@ -92,7 +120,588 @@ static bool have_createdb_privilege(void); static void remove_dbtablespaces(Oid db_id); static bool check_db_file_conflict(Oid db_id); static int errdetail_busy_db(int notherbackends, int npreparedxacts); +static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, + bool isRedo); +static List *GetRelListFromPage(Page page, Buffer buf, Oid tbid, Oid dbid, + char *srcpath, List *rnodelist, Snapshot + snapshot); +static List *GetDatabaseRelationList(Oid srctbid, Oid srcdbid, char *srcpath); +static void RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst, + ForkNumber forkNum, char relpersistence); +static void CopyDatabaseWithWal(Oid src_dboid, Oid dboid, Oid src_tsid, + Oid dst_tsid); +static void CopyDatabase(Oid src_dboid, Oid dboid, Oid src_tsid, Oid dst_tsid); + +/* + * CreateDirAndVersionFile - Create database directory and write out the + * PG_VERSION file in the database path. + * + * If isRedo is true, it's okay for the database directory to exist already. + * + * We can directly write PG_MAJORVERSION in the version file instead of copying + * from the source database file because these two must be the same. + */ +static void +CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo) +{ + int fd; + int nbytes; + char versionfile[MAXPGPATH]; + char buf[16]; + + /* Prepare version data before starting a critical section. */ + sprintf(buf, "%s\n", PG_MAJORVERSION); + nbytes = strlen(PG_MAJORVERSION) + 1; + + /* If we are not in WAL replay then write the WAL. */ + if (!isRedo) + { + xl_dbase_create_rec xlrec; + XLogRecPtr lsn; + + /* Now errors are fatal ... */ + START_CRIT_SECTION(); + + xlrec.db_id = dbid; + xlrec.tablespace_id = tsid; + xlrec.src_db_id = InvalidOid; + xlrec.src_tablespace_id = InvalidOid; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), sizeof(xl_dbase_create_rec)); + + lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE); + + /* As always, WAL must hit the disk before the data update does. */ + XLogFlush(lsn); + } + + /* Create database directory. */ + if (MakePGDirectory(dbpath) < 0) + { + /* Failure other than already exists or not in WAL replay? */ + if (errno != EEXIST || !isRedo) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", dbpath))); + } + + /* + * Create PG_VERSION file in the database path. If the file already exists + * and we are in WAL replay then try again to open it in write mode. + */ + snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION"); + + fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY); + if (fd < 0 && errno == EEXIST && isRedo) + fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY); + + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", versionfile))); + + /* Write PG_MAJORVERSION in the PG_VERSION file. */ + pgstat_report_wait_start(WAIT_EVENT_COPY_FILE_WRITE); + errno = 0; + if ((int) write(fd, buf, nbytes) != nbytes) + { + /* If write didn't set errno, assume problem is no disk space. */ + if (errno == 0) + errno = ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", versionfile))); + } + pgstat_report_wait_end(); + + /* Close the version file. */ + CloseTransientFile(fd); + + /* Critical section done. */ + if (!isRedo) + END_CRIT_SECTION(); +} + +/* + * GetRelListFromPage - Helper function for GetDatabaseRelationList. + * + * Iterate over each tuple of input pg_class and get a list of all the valid + * relfilenodes of the given block and append them to input rnodelist. + */ +static List * +GetRelListFromPage(Page page, Buffer buf, Oid tbid, Oid dbid, char *srcpath, + List *rnodelist, Snapshot snapshot) +{ + BlockNumber blkno = BufferGetBlockNumber(buf); + OffsetNumber offnum; + OffsetNumber maxoff; + HeapTupleData tuple; + Form_pg_class classForm; + + maxoff = PageGetMaxOffsetNumber(page); + + /* Iterate over each tuple on the page. */ + for (offnum = FirstOffsetNumber; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemId itemid; + + itemid = PageGetItemId(page, offnum); + + /* Nothing to do if slot is empty or already dead. */ + if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) || + ItemIdIsRedirected(itemid)) + continue; + + Assert(ItemIdIsNormal(itemid)); + ItemPointerSet(&(tuple.t_self), blkno, offnum); + + tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); + tuple.t_len = ItemIdGetLength(itemid); + tuple.t_tableOid = RelationRelationId; + + /* + * If the tuple is visible then add its relfilenode info to the + * list. + */ + if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf)) + { + Oid relfilenode = InvalidOid; + CreateDBRelInfo *relinfo; + + classForm = (Form_pg_class) GETSTRUCT(&tuple); + + /* We don't need to copy the shared objects to the target. */ + if (classForm->reltablespace == GLOBALTABLESPACE_OID) + continue; + + /* + * If the object doesn't have the storage then nothing to be + * done for that object so just ignore it. + */ + if (!RELKIND_HAS_STORAGE(classForm->relkind)) + continue; + + /* + * If relfilenode is valid then directly use it. Otherwise, + * consult the relmapper for the mapped relation. + */ + if (OidIsValid(classForm->relfilenode)) + relfilenode = classForm->relfilenode; + else + relfilenode = RelationMapOidToFilenodeForDatabase(srcpath, + classForm->oid); + + /* We must have a valid relfilenode oid. */ + Assert(OidIsValid(relfilenode)); + + /* Prepare a rel info element and add it to the list. */ + relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo)); + if (OidIsValid(classForm->reltablespace)) + relinfo->rnode.spcNode = classForm->reltablespace; + else + relinfo->rnode.spcNode = tbid; + + relinfo->rnode.dbNode = dbid; + relinfo->rnode.relNode = relfilenode; + relinfo->reloid = classForm->oid; + relinfo->relpersistence = classForm->relpersistence; + + /* Add it to the list. */ + rnodelist = lappend(rnodelist, relinfo); + } + } + + return rnodelist; +} + +/* + * GetDatabaseRelationList - Get relfilenode list to be copied. + * + * Iterate over each block of the pg_class relation. From there, we will check + * all the visible tuples in order to get a list of all the valid relfilenodes + * in the source database that should be copied to the target database. + */ +static List * +GetDatabaseRelationList(Oid tbid, Oid dbid, char *srcpath) +{ + SMgrRelation rd_smgr; + RelFileNode rnode; + BlockNumber nblocks; + BlockNumber blkno; + Buffer buf; + Oid relfilenode; + Page page; + List *rnodelist = NIL; + LockRelId relid; + Snapshot snapshot; + BufferAccessStrategy bstrategy; + + /* Get pg_class relfilenode. */ + relfilenode = RelationMapOidToFilenodeForDatabase(srcpath, + RelationRelationId); + /* + * We are going to read the buffers associated with the pg_class relation. + * Thus, acquire the relation level lock before start scanning. As we are + * not connected to the database, we cannot use relation_open directly, so + * we have to lock using relation id. + */ + relid.dbId = dbid; + relid.relId = RelationRelationId; + LockRelationId(&relid, AccessShareLock); + + /* Prepare a relnode for pg_class relation. */ + rnode.spcNode = tbid; + rnode.dbNode = dbid; + rnode.relNode = relfilenode; + + /* + * We are not connected to the source database so open the pg_class + * relation at the smgr level and get the block count. + */ + rd_smgr = smgropen(rnode, InvalidBackendId); + nblocks = smgrnblocks(rd_smgr, MAIN_FORKNUM); + + /* + * We're going to read the whole pg_class so better to use bulk-read buffer + * access strategy. + */ + bstrategy = GetAccessStrategy(BAS_BULKREAD); + + /* Get current active snapshot for scanning the pg_class. */ + snapshot = GetActiveSnapshot(); + + /* Iterate over each block on the pg_class relation. */ + for (blkno = 0; blkno < nblocks; blkno++) + { + /* + * We are not connected to the source database so directly use the lower + * level bufmgr interface which operates on the rnode. + */ + buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno, + RBM_NORMAL, bstrategy, + RELPERSISTENCE_PERMANENT); + + LockBuffer(buf, BUFFER_LOCK_SHARE); + page = BufferGetPage(buf); + if (PageIsNew(page) || PageIsEmpty(page)) + { + UnlockReleaseBuffer(buf); + continue; + } + + /* + * Process pg_class tuple for the current page and add all the valid + * relfilenode entries to the rnodelist. + */ + rnodelist = GetRelListFromPage(page, buf, tbid, dbid, srcpath, + rnodelist, snapshot); + + /* Release the buffer lock. */ + UnlockReleaseBuffer(buf); + } + + /* Release the lock. */ + UnlockRelationId(&relid, AccessShareLock); + + return rnodelist; +} + +/* + * RelationCopyStorageUsingBuffer - Copy fork's data using bufmgr. + * + * Same as RelationCopyStorage but instead of using smgrread and smgrextend + * this will copy using bufmgr APIs. + */ +static void +RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst, + ForkNumber forkNum, char relpersistence) +{ + Buffer srcBuf; + Buffer dstBuf; + Page srcPage; + Page dstPage; + bool use_wal; + bool copying_initfork; + BlockNumber nblocks; + BlockNumber blkno; + BufferAccessStrategy bstrategy_src; + BufferAccessStrategy bstrategy_dst; + + /* Refer comments in RelationCopyStorage. */ + copying_initfork = relpersistence == RELPERSISTENCE_UNLOGGED && + forkNum == INIT_FORKNUM; + use_wal = XLogIsNeeded() && + (relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork); + + /* Get number of blocks in the source relation. */ + nblocks = smgrnblocks(src, forkNum); + + /* + * We are going to copy whole relation from the source to the destination + * so use BAS_BULKREAD strategy for the source relation and BAS_BULKWRITE + * strategy for the destination relation. + */ + bstrategy_src = GetAccessStrategy(BAS_BULKREAD); + bstrategy_dst = GetAccessStrategy(BAS_BULKWRITE); + + /* Iterate over each block of the source relation file. */ + for (blkno = 0; blkno < nblocks; blkno++) + { + /* If we got a cancel signal during the copy of the data, quit */ + CHECK_FOR_INTERRUPTS(); + + /* Read block from source relation. */ + srcBuf = ReadBufferWithoutRelcache(src->smgr_rnode.node, forkNum, + blkno, RBM_NORMAL, bstrategy_src, + relpersistence); + srcPage = BufferGetPage(srcBuf); + if (PageIsNew(srcPage) || PageIsEmpty(srcPage)) + { + ReleaseBuffer(srcBuf); + continue; + } + + /* Use P_NEW to extend the relation. */ + dstBuf = ReadBufferWithoutRelcache(dst->smgr_rnode.node, forkNum, + P_NEW, RBM_NORMAL, bstrategy_dst, + relpersistence); + LockBuffer(dstBuf, BUFFER_LOCK_EXCLUSIVE); + + START_CRIT_SECTION(); + + /* Initialize the page and write the data. */ + dstPage = BufferGetPage(dstBuf); + PageInit(dstPage, BufferGetPageSize(dstBuf), 0); + memcpy(dstPage, srcPage, BLCKSZ); + MarkBufferDirty(dstBuf); + + /* WAL-log the copied page. */ + if (use_wal) + log_newpage_buffer(dstBuf, true); + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(dstBuf); + ReleaseBuffer(srcBuf); + } +} + +/* + * CopyDatabaseWithWal - Copy source database to the target database with WAL + * + * Create target database directory and copy data files from the source + * database to the target database, block by block and WAL log all the + * operations. + */ +static void +CopyDatabaseWithWal(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid) +{ + char *srcpath; + char *dstpath; + List *rnodelist = NULL; + ListCell *cell; + LockRelId relid; + RelFileNode srcrnode; + RelFileNode dstrnode; + CreateDBRelInfo *relinfo; + + /* Get the source database path. */ + srcpath = GetDatabasePath(src_dboid, src_tsid); + + /* Get the destination database path. */ + dstpath = GetDatabasePath(dst_dboid, dst_tsid); + + /* Create database directory and write PG_VERSION file. */ + CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false); + + /* Copy relmap file from source database to the destination database. */ + CopyRelationMap(dst_dboid, dst_tsid, srcpath, dstpath); + + /* Get list of all valid relnode from the source database. */ + rnodelist = GetDatabaseRelationList(src_tsid, src_dboid, srcpath); + Assert(rnodelist != NIL); + + /* + * Database id is common for all the relation so set it before entering to + * the loop. + */ + relid.dbId = src_dboid; + + /* + * Iterate over each relfilenode and copy the relation data block by block + * from source database to the destination database. + */ + foreach(cell, rnodelist) + { + SMgrRelation src_smgr; + SMgrRelation dst_smgr; + + relinfo = lfirst(cell); + srcrnode = relinfo->rnode; + + /* + * If the relation is from the source db's default tablespace then we + * need to create it in the destinations db's default tablespace. + * Otherwise, we need to create in the same tablespace as it is in the + * source database. + */ + if (srcrnode.spcNode == src_tsid) + dstrnode.spcNode = dst_tsid; + /* + * In case of ALTER DATABASE SET TABLESPACE we don't need to do + * anything for the object which are not in the source db's default + * tablespace. The source and destination dboid will be same in + * case of ALTER DATABASE SET TABLESPACE. + */ + else if (src_dboid == dst_dboid) + continue; + else + dstrnode.spcNode = srcrnode.spcNode; + + dstrnode.dbNode = dst_dboid; + dstrnode.relNode = srcrnode.relNode; + + /* Acquire the lock on relation before start copying. */ + relid.relId = relinfo->reloid; + LockRelationId(&relid, AccessShareLock); + /* Open the source and the destination relation at smgr level. */ + src_smgr = smgropen(srcrnode, InvalidBackendId); + dst_smgr = smgropen(dstrnode, InvalidBackendId); + + /* Copy relation storage from source to the destination. */ + RelationCopyAllFork(src_smgr, dst_smgr, relinfo->relpersistence, + RelationCopyStorageUsingBuffer); + + /* Release the lock. */ + UnlockRelationId(&relid, AccessShareLock); + } + + list_free_deep(rnodelist); +} + +/* + * CopyDatabase - Copy source database to the target database + * + * Copy source database directory to the destination directory using copydir + * operation. + */ +static void +CopyDatabase(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid) +{ + TableScanDesc scan; + Relation rel; + HeapTuple tuple; + + /* + * Force a checkpoint before starting the copy. This will force all + * dirty buffers, including those of unlogged tables, out to disk, to + * ensure source database is up-to-date on disk for the copy. + * FlushDatabaseBuffers() would suffice for that, but we also want to + * process any pending unlink requests. Otherwise, if a checkpoint + * happened while we're copying files, a file might be deleted just + * when we're about to copy it, causing the lstat() call in copydir() + * to fail with ENOENT. + */ + RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | + CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL); + + /* + * Iterate through all tablespaces of the template database, and copy + * each one to the new database. + */ + rel = table_open(TableSpaceRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple); + Oid srctablespace = spaceform->oid; + Oid dsttablespace; + char *srcpath; + char *dstpath; + struct stat st; + + /* No need to copy global tablespace */ + if (srctablespace == GLOBALTABLESPACE_OID) + continue; + + srcpath = GetDatabasePath(src_dboid, srctablespace); + + if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || + directory_is_empty(srcpath)) + { + /* Assume we can ignore it */ + pfree(srcpath); + continue; + } + + if (srctablespace == src_tsid) + dsttablespace = dst_tsid; + else + dsttablespace = srctablespace; + + dstpath = GetDatabasePath(dst_dboid, dsttablespace); + + /* + * Copy this subdirectory to the new location + * + * We don't need to copy subdirectories + */ + copydir(srcpath, dstpath, false); + + /* Record the filesystem change in XLOG */ + { + xl_dbase_create_rec xlrec; + + xlrec.db_id = dst_dboid; + xlrec.tablespace_id = dsttablespace; + xlrec.src_db_id = src_dboid; + xlrec.src_tablespace_id = srctablespace; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec)); + + (void) XLogInsert(RM_DBASE_ID, + XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); + } + } + table_endscan(scan); + table_close(rel, AccessShareLock); + + /* + * We force a checkpoint before committing. This effectively means + * that committed XLOG_DBASE_CREATE operations will never need to be + * replayed (at least not in ordinary crash recovery; we still have to + * make the XLOG entry for the benefit of PITR operations). This + * avoids two nasty scenarios: + * + * #1: When PITR is off, we don't XLOG the contents of newly created + * indexes; therefore the drop-and-recreate-whole-directory behavior + * of DBASE_CREATE replay would lose such indexes. + * + * #2: Since we have to recopy the source database during DBASE_CREATE + * replay, we run the risk of copying changes in it that were + * committed after the original CREATE DATABASE command but before the + * system crash that led to the replay. This is at least unexpected + * and at worst could lead to inconsistencies, eg duplicate table + * names. + * + * (Both of these were real bugs in releases 8.0 through 8.0.3.) + * + * In PITR replay, the first of these isn't an issue, and the second + * is only a risk if the CREATE DATABASE and subsequent template + * database change both occur while a base backup is being taken. + * There doesn't seem to be much we can do about that except document + * it as a limitation. + * + * Perhaps if we ever implement CREATE DATABASE in a less cheesy way, + * we can avoid this. + */ + RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); +} /* * CREATE DATABASE @@ -100,8 +709,6 @@ static int errdetail_busy_db(int notherbackends, int npreparedxacts); Oid createdb(ParseState *pstate, const CreatedbStmt *stmt) { - TableScanDesc scan; - Relation rel; Oid src_dboid; Oid src_owner; int src_encoding = -1; @@ -132,6 +739,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) DefElem *dallowconnections = NULL; DefElem *dconnlimit = NULL; DefElem *dcollversion = NULL; + DefElem *dstrategy = NULL; char *dbname = stmt->dbname; char *dbowner = NULL; const char *dbtemplate = NULL; @@ -145,6 +753,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) char *dbcollversion = NULL; int notherbackends; int npreparedxacts; + CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG; createdb_failure_params fparms; /* Extract options from the statement node tree */ @@ -250,6 +859,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) (errcode(ERRCODE_INVALID_PARAMETER_VALUE)), errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId)); } + else if (strcmp(defel->defname, "strategy") == 0) + { + if (dstrategy) + errorConflictingDefElem(defel, pstate); + dstrategy = defel; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -374,6 +989,24 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) dbtemplate))); } + /* Validate the database creation strategy. */ + if (dstrategy && dstrategy->arg) + { + char *strategy; + + strategy = defGetString(dstrategy); + if (strcmp(strategy, "wal_log") == 0) + dbstrategy = CREATEDB_WAL_LOG; + else if (strcmp(strategy, "file_copy") == 0) + dbstrategy = CREATEDB_FILE_COPY; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%s is not a valid create database strategy", + strategy), + parser_errposition(pstate, dstrategy->location))); + } + /* If encoding or locales are defaulted, use source's setting */ if (encoding < 0) encoding = src_encoding; @@ -668,19 +1301,6 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0); /* - * Force a checkpoint before starting the copy. This will force all dirty - * buffers, including those of unlogged tables, out to disk, to ensure - * source database is up-to-date on disk for the copy. - * FlushDatabaseBuffers() would suffice for that, but we also want to - * process any pending unlink requests. Otherwise, if a checkpoint - * happened while we're copying files, a file might be deleted just when - * we're about to copy it, causing the lstat() call in copydir() to fail - * with ENOENT. - */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT - | CHECKPOINT_FLUSH_ALL); - - /* * Once we start copying subdirectories, we need to be able to clean 'em * up if we fail. Use an ENSURE block to make sure this happens. (This * is not a 100% solution, because of the possibility of failure during @@ -689,114 +1309,47 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) */ fparms.src_dboid = src_dboid; fparms.dest_dboid = dboid; + fparms.strategy = dbstrategy; + PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback, PointerGetDatum(&fparms)); { /* - * Iterate through all tablespaces of the template database, and copy - * each one to the new database. + * If the user has asked to create a database with WAL_LOG strategy + * then call CopyDatabaseWithWal, which will copy the database at the + * block level and it will WAL log each copied block. Otherwise, + * call CopyDatabase that will copy the database file by file. */ - rel = table_open(TableSpaceRelationId, AccessShareLock); - scan = table_beginscan_catalog(rel, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + if (dbstrategy == CREATEDB_WAL_LOG) { - Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple); - Oid srctablespace = spaceform->oid; - Oid dsttablespace; - char *srcpath; - char *dstpath; - struct stat st; - - /* No need to copy global tablespace */ - if (srctablespace == GLOBALTABLESPACE_OID) - continue; - - srcpath = GetDatabasePath(src_dboid, srctablespace); - - if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || - directory_is_empty(srcpath)) - { - /* Assume we can ignore it */ - pfree(srcpath); - continue; - } - - if (srctablespace == src_deftablespace) - dsttablespace = dst_deftablespace; - else - dsttablespace = srctablespace; - - dstpath = GetDatabasePath(dboid, dsttablespace); + CopyDatabaseWithWal(src_dboid, dboid, src_deftablespace, + dst_deftablespace); /* - * Copy this subdirectory to the new location - * - * We don't need to copy subdirectories + * Close pg_database, but keep lock till commit. */ - copydir(srcpath, dstpath, false); - - /* Record the filesystem change in XLOG */ - { - xl_dbase_create_rec xlrec; - - xlrec.db_id = dboid; - xlrec.tablespace_id = dsttablespace; - xlrec.src_db_id = src_dboid; - xlrec.src_tablespace_id = srctablespace; - - XLogBeginInsert(); - XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec)); - - (void) XLogInsert(RM_DBASE_ID, - XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); - } + table_close(pg_database_rel, NoLock); } - table_endscan(scan); - table_close(rel, AccessShareLock); + else + { + Assert(dbstrategy == CREATEDB_FILE_COPY); - /* - * We force a checkpoint before committing. This effectively means - * that committed XLOG_DBASE_CREATE operations will never need to be - * replayed (at least not in ordinary crash recovery; we still have to - * make the XLOG entry for the benefit of PITR operations). This - * avoids two nasty scenarios: - * - * #1: When PITR is off, we don't XLOG the contents of newly created - * indexes; therefore the drop-and-recreate-whole-directory behavior - * of DBASE_CREATE replay would lose such indexes. - * - * #2: Since we have to recopy the source database during DBASE_CREATE - * replay, we run the risk of copying changes in it that were - * committed after the original CREATE DATABASE command but before the - * system crash that led to the replay. This is at least unexpected - * and at worst could lead to inconsistencies, eg duplicate table - * names. - * - * (Both of these were real bugs in releases 8.0 through 8.0.3.) - * - * In PITR replay, the first of these isn't an issue, and the second - * is only a risk if the CREATE DATABASE and subsequent template - * database change both occur while a base backup is being taken. - * There doesn't seem to be much we can do about that except document - * it as a limitation. - * - * Perhaps if we ever implement CREATE DATABASE in a less cheesy way, - * we can avoid this. - */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); + CopyDatabase(src_dboid, dboid, src_deftablespace, + dst_deftablespace); - /* - * Close pg_database, but keep lock till commit. - */ - table_close(pg_database_rel, NoLock); + /* + * Close pg_database, but keep lock till commit. + */ + table_close(pg_database_rel, NoLock); - /* - * Force synchronous commit, thus minimizing the window between - * creation of the database files and committal of the transaction. If - * we crash before committing, we'll have a DB that's taking up disk - * space but is not in pg_database, which is not good. - */ - ForceSyncCommit(); + /* + * Force synchronous commit, thus minimizing the window between + * creation of the database files and committal of the transaction. + * If we crash before committing, we'll have a DB that's taking up + * disk space but is not in pg_database, which is not good. + */ + ForceSyncCommit(); + } } PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback, PointerGetDatum(&fparms)); @@ -870,6 +1423,21 @@ createdb_failure_callback(int code, Datum arg) createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg); /* + * If we were copying database at block levels then drop pages for the + * destination database that are in the shared buffer cache. And tell + * checkpointer to forget any pending fsync and unlink requests for + * files in the database. The reasoning behind doing this is same as + * explained in dropdb function. But unlike dropdb we don't need to call + * pgstat_drop_database because this database is still not created so there + * should not be any stat for this. + */ + if (fparms->strategy == CREATEDB_WAL_LOG) + { + DropDatabaseBuffers(fparms->dest_dboid); + ForgetDatabaseSyncRequests(fparms->dest_dboid); + } + + /* * Release lock on source database before doing recursive remove. This is * not essential but it seems desirable to release the lock as soon as * possible. @@ -2387,32 +2955,40 @@ dbase_redo(XLogReaderState *record) src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id); dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id); - /* - * Our theory for replaying a CREATE is to forcibly drop the target - * subdirectory if present, then re-copy the source data. This may be - * more work than needed, but it is simple to implement. - */ - if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode)) + if (!OidIsValid(xlrec->src_db_id)) { - if (!rmtree(dst_path, true)) - /* If this failed, copydir() below is going to error. */ - ereport(WARNING, - (errmsg("some useless files may be left behind in old database directory \"%s\"", - dst_path))); + CreateDirAndVersionFile(dst_path, xlrec->db_id, xlrec->tablespace_id, + true); } + else + { + /* + * Our theory for replaying a CREATE is to forcibly drop the target + * subdirectory if present, then re-copy the source data. This may be + * more work than needed, but it is simple to implement. + */ + if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode)) + { + if (!rmtree(dst_path, true)) + /* If this failed, copydir() below is going to error. */ + ereport(WARNING, + (errmsg("some useless files may be left behind in old database directory \"%s\"", + dst_path))); + } - /* - * Force dirty buffers out to disk, to ensure source database is - * up-to-date for the copy. - */ - FlushDatabaseBuffers(xlrec->src_db_id); + /* + * Force dirty buffers out to disk, to ensure source database is + * up-to-date for the copy. + */ + FlushDatabaseBuffers(xlrec->src_db_id); - /* - * Copy this subdirectory to the new location - * - * We don't need to copy subdirectories - */ - copydir(src_path, dst_path, false); + /* + * Copy this subdirectory to the new location + * + * We don't need to copy subdirectories + */ + copydir(src_path, dst_path, false); + } } else if (info == XLOG_DBASE_DROP) { diff --git a/src/include/commands/dbcommands_xlog.h b/src/include/commands/dbcommands_xlog.h index 593a857..8f59870 100644 --- a/src/include/commands/dbcommands_xlog.h +++ b/src/include/commands/dbcommands_xlog.h @@ -20,6 +20,7 @@ /* record types */ #define XLOG_DBASE_CREATE 0x00 #define XLOG_DBASE_DROP 0x10 +#define XLOG_DBASE_CREATEDIR 0x20 typedef struct xl_dbase_create_rec { @@ -30,6 +31,13 @@ typedef struct xl_dbase_create_rec Oid src_tablespace_id; } xl_dbase_create_rec; +typedef struct xl_dbase_createdir_rec +{ + /* Records creating database directory */ + Oid db_id; + Oid tablespace_id; +} xl_dbase_createdir_rec; + typedef struct xl_dbase_drop_rec { Oid db_id; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c44b5a9..5e34ea3 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -460,6 +460,7 @@ CoverPos CreateAmStmt CreateCastStmt CreateConversionStmt +CreateDBRelInfo CreateDomainStmt CreateEnumStmt CreateEventTrigStmt -- 1.8.3.1