From 050dd60e9c8b1e2b9447d13ff9359b2a2e8efa8a Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 4 Dec 2018 14:06:32 +0530 Subject: [PATCH] Undo worker and transaction rollback Patch provides mechanism for discarding the older undo when they are no longer relavent. It also providea a mechanism for rolling back the effect of the aborted transactiona. It handles the rollbacks directly from backends as well as from undo worker for the larger transaction. Dilip Kumar, with help from Rafia Sabih, Amit Kapila, Mithun.CY, Thomas Munro and Kuntal Ghosh --- src/backend/access/rmgrdesc/Makefile | 3 +- src/backend/access/rmgrdesc/undoactiondesc.c | 64 +++ src/backend/access/rmgrdesc/xlogdesc.c | 4 +- src/backend/access/transam/rmgr.c | 5 +- src/backend/access/transam/twophase.c | 45 +- src/backend/access/transam/varsup.c | 12 + src/backend/access/transam/xact.c | 441 ++++++++++++++++- src/backend/access/transam/xlog.c | 20 + src/backend/access/undo/Makefile | 2 +- src/backend/access/undo/undoaction.c | 683 ++++++++++++++++++++++++++ src/backend/access/undo/undoactionxlog.c | 49 ++ src/backend/access/undo/undodiscard.c | 458 +++++++++++++++++ src/backend/access/undo/undoinsert.c | 50 +- src/backend/access/undo/undorecord.c | 3 + src/backend/commands/vacuum.c | 11 + src/backend/postmaster/Makefile | 4 +- src/backend/postmaster/bgworker.c | 11 + src/backend/postmaster/discardworker.c | 170 +++++++ src/backend/postmaster/pgstat.c | 7 +- src/backend/postmaster/postmaster.c | 7 + src/backend/postmaster/undoworker.c | 664 +++++++++++++++++++++++++ src/backend/replication/logical/decode.c | 4 + src/backend/storage/ipc/ipci.c | 6 + src/backend/storage/lmgr/lwlocknames.txt | 2 + src/backend/storage/lmgr/proc.c | 2 + src/backend/utils/adt/lockfuncs.c | 1 + src/backend/utils/init/globals.c | 1 + src/backend/utils/misc/guc.c | 11 + src/backend/utils/misc/pg_controldata.c | 9 +- src/backend/utils/misc/postgresql.conf.sample | 7 + src/bin/pg_controldata/pg_controldata.c | 2 + src/bin/pg_resetwal/pg_resetwal.c | 7 + src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/rmgrdesc.c | 3 +- src/include/access/rmgr.h | 2 +- src/include/access/rmgrlist.h | 47 +- src/include/access/transam.h | 4 + src/include/access/twophase.h | 2 +- src/include/access/undoaction.h | 28 ++ src/include/access/undoaction_xlog.h | 74 +++ src/include/access/undodiscard.h | 31 ++ src/include/access/undoinsert.h | 2 + src/include/access/undorecord.h | 13 + src/include/access/xact.h | 1 + src/include/access/xlog.h | 3 + src/include/access/xlog_internal.h | 9 +- src/include/catalog/pg_control.h | 7 + src/include/miscadmin.h | 1 + src/include/pgstat.h | 3 + src/include/postmaster/discardworker.h | 25 + src/include/postmaster/undoloop.h | 89 ++++ src/include/postmaster/undoworker.h | 39 ++ src/include/storage/lock.h | 10 + src/include/storage/proc.h | 2 + 54 files changed, 3117 insertions(+), 45 deletions(-) create mode 100644 src/backend/access/rmgrdesc/undoactiondesc.c create mode 100644 src/backend/access/undo/undoaction.c create mode 100644 src/backend/access/undo/undoactionxlog.c create mode 100644 src/backend/access/undo/undodiscard.c create mode 100644 src/backend/postmaster/discardworker.c create mode 100644 src/backend/postmaster/undoworker.c create mode 100644 src/include/access/undoaction.h create mode 100644 src/include/access/undoaction_xlog.h create mode 100644 src/include/access/undodiscard.h create mode 100644 src/include/postmaster/discardworker.h create mode 100644 src/include/postmaster/undoloop.h create mode 100644 src/include/postmaster/undoworker.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 91ad1ef..640d37f 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undologdesc.o xactdesc.o xlogdesc.o + smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \ + undologdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/undoactiondesc.c b/src/backend/access/rmgrdesc/undoactiondesc.c new file mode 100644 index 0000000..89343b8 --- /dev/null +++ b/src/backend/access/rmgrdesc/undoactiondesc.c @@ -0,0 +1,64 @@ +/*------------------------------------------------------------------------- + * + * undoactiondesc.c + * rmgr descriptor routines for access/undo/undoactionxlog.c + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/undoactiondesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" + +void +undoaction_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_UNDO_PAGE) + { + uint8 *flags = (uint8 *) rec; + + appendStringInfo(buf, "page_contains_tpd_slot: %c ", + (*flags & XLU_PAGE_CONTAINS_TPD_SLOT) ? 'T' : 'F'); + appendStringInfo(buf, "is_page_initialized: %c ", + (*flags & XLU_INIT_PAGE) ? 'T' : 'F'); + if (*flags & XLU_PAGE_CONTAINS_TPD_SLOT) + { + xl_undoaction_page *xlrec = + (xl_undoaction_page *) ((char *) flags + sizeof(uint8)); + + appendStringInfo(buf, "urec_ptr %lu xid %u trans_slot_id %u", + xlrec->urec_ptr, xlrec->xid, xlrec->trans_slot_id); + } + } + else if (info == XLOG_UNDO_APPLY_PROGRESS) + { + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) rec; + + appendStringInfo(buf, "urec_ptr %lu progress %u", + xlrec->urec_ptr, xlrec->progress); + } +} + +const char * +undoaction_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_UNDO_APPLY_PROGRESS: + id = "UNDO APPLY PROGRESS"; + break; + } + + return id; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 00741c7..987e39c 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -47,7 +47,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " "oldest/newest commit timestamp xid: %u/%u; " - "oldest running xid %u; %s", + "oldest running xid %u; " + "oldest xid with epoch having undo " UINT64_FORMAT "; %s", (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo, checkpoint->ThisTimeLineID, checkpoint->PrevTimeLineID, @@ -63,6 +64,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) checkpoint->oldestCommitTsXid, checkpoint->newestCommitTsXid, checkpoint->oldestActiveXid, + checkpoint->oldestXidWithEpochHavingUndo, (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); } else if (info == XLOG_NEXTOID) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 8b05374..6238240 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -18,6 +18,7 @@ #include "access/multixact.h" #include "access/nbtxlog.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -31,8 +32,8 @@ #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ + { name, redo, desc, identify, startup, cleanup, mask, undo, undo_desc }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e65dccc..2c640f9 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -93,6 +93,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "postmaster/undoloop.h" #include "replication/origin.h" #include "replication/syncrep.h" #include "replication/walsender.h" @@ -915,6 +916,12 @@ typedef struct TwoPhaseFileHeader uint16 gidlen; /* length of the GID - GID follows the header */ XLogRecPtr origin_lsn; /* lsn of this record at origin node */ TimestampTz origin_timestamp; /* time of prepare at origin node */ + /* + * We need the locations of start and end undo record pointers when rollbacks + * are to be performed for prepared transactions using zheap relations. + */ + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; } TwoPhaseFileHeader; /* @@ -989,7 +996,8 @@ save_state_data(const void *data, uint32 len) * Initializes data structure and inserts the 2PC file header record. */ void -StartPrepare(GlobalTransaction gxact) +StartPrepare(GlobalTransaction gxact, UndoRecPtr *start_urec_ptr, + UndoRecPtr *end_urec_ptr) { PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -1027,6 +1035,10 @@ StartPrepare(GlobalTransaction gxact) &hdr.initfileinval); hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */ + /* save the start and end undo record pointers */ + memcpy(hdr.start_urec_ptr, start_urec_ptr, sizeof(hdr.start_urec_ptr)); + memcpy(hdr.end_urec_ptr, end_urec_ptr, sizeof(hdr.end_urec_ptr)); + save_state_data(&hdr, sizeof(TwoPhaseFileHeader)); save_state_data(gxact->gid, hdr.gidlen); @@ -1452,6 +1464,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RelFileNode *delrels; int ndelrels; SharedInvalidationMessage *invalmsgs; + int i; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; /* * Validate the GID, and lock the GXACT to ensure that two backends do not @@ -1489,6 +1504,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit) invalmsgs = (SharedInvalidationMessage *) bufptr; bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); + /* save the start and end undo record pointers */ + memcpy(start_urec_ptr, hdr->start_urec_ptr, sizeof(start_urec_ptr)); + memcpy(end_urec_ptr, hdr->end_urec_ptr, sizeof(end_urec_ptr)); + + /* + * Perform undo actions, if there are undologs for this transaction. + * We need to perform undo actions while we are still in transaction. + * Never push rollbacks of temp tables to undo worker. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (end_urec_ptr[i] != InvalidUndoRecPtr && !isCommit) + { + bool result = false; + uint64 rollback_size = 0; + + if (i != UNDO_TEMP) + rollback_size = end_urec_ptr[i] - start_urec_ptr[i]; + + if (rollback_size >= rollback_overflow_size * 1024 * 1024) + result = PushRollbackReq(end_urec_ptr[i], start_urec_ptr[i], InvalidOid); + + if (!result) + execute_undo_actions(end_urec_ptr[i], start_urec_ptr[i], true, + true, false); + } + } + /* compute latestXid among all children */ latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children); diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index a5eb29e..31bea0e 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -284,10 +284,22 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) TransactionId xidStopLimit; TransactionId xidWrapLimit; TransactionId curXid; + TransactionId oldestXidHavingUndo; Assert(TransactionIdIsNormal(oldest_datfrozenxid)); /* + * To determine the last safe xid that can be allocated, we need to + * consider oldestXidHavingUndo. The oldestXidHavingUndo will be only + * valid for zheap storage engine, so it won't impact any other storage + * engine. + */ + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + if (TransactionIdIsValid(oldestXidHavingUndo)) + oldest_datfrozenxid = Min(oldest_datfrozenxid, oldestXidHavingUndo); + + /* * The place where we actually get into deep trouble is halfway around * from the oldest potentially-existing XID. (This calculation is * probably off by one or two counts, because the special XIDs reduce the diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 6060013..66d1685 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -42,6 +42,7 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/undoloop.h" #include "replication/logical.h" #include "replication/logicallauncher.h" #include "replication/origin.h" @@ -280,6 +281,20 @@ typedef struct SubXactCallbackItem static SubXactCallbackItem *SubXact_callbacks = NULL; +/* Location in undo log from where to start applying the undo actions. */ +static UndoRecPtr UndoActionStartPtr[UndoPersistenceLevels] = + {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + +/* Location in undo log up to which undo actions need to be applied. */ +static UndoRecPtr UndoActionEndPtr[UndoPersistenceLevels] = + {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + +/* Do we need to perform any undo actions? */ +static bool PerformUndoActions = false; /* local function prototypes */ static void AssignTransactionId(TransactionState s); @@ -1826,6 +1841,7 @@ StartTransaction(void) { TransactionState s; VirtualTransactionId vxid; + int i; /* * Let's just make sure the state stack is empty @@ -1904,6 +1920,13 @@ StartTransaction(void) nUnreportedXids = 0; s->didLogXid = false; + /* initialize undo record locations for the transaction */ + for(i = 0; i < UndoPersistenceLevels; i++) + { + s->start_urec_ptr[i] = InvalidUndoRecPtr; + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + } + /* * must initialize resource-management stuff first */ @@ -2213,7 +2236,7 @@ CommitTransaction(void) * NB: if you change this routine, better look at CommitTransaction too! */ static void -PrepareTransaction(void) +PrepareTransaction(UndoRecPtr *start_urec_ptr, UndoRecPtr *end_urec_ptr) { TransactionState s = CurrentTransactionState; TransactionId xid = GetCurrentTransactionId(); @@ -2361,7 +2384,7 @@ PrepareTransaction(void) * PREPARED; in particular, pay attention to whether things should happen * before or after releasing the transaction's locks. */ - StartPrepare(gxact); + StartPrepare(gxact, start_urec_ptr, end_urec_ptr); AtPrepare_Notify(); AtPrepare_Locks(); @@ -2484,6 +2507,65 @@ PrepareTransaction(void) RESUME_INTERRUPTS(); } +static void +AtAbort_Rollback(void) +{ + TransactionState s = CurrentTransactionState; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + int i; + + /* XXX: TODO: check this logic, which was moved out of UserAbortTransactionBlock */ + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); + + /* + * Remember the required information for performing undo actions. So that + * if there is any failure in executing the undo action we can execute + * it later. + */ + memcpy (UndoActionStartPtr, latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy (UndoActionEndPtr, s->start_urec_ptr, sizeof(UndoActionEndPtr)); + + /* + * If we are in a valid transaction state then execute the undo action here + * itself, otherwise we have already stored the required information for + * executing the undo action later. + */ + if (CurrentTransactionState->state == TRANS_INPROGRESS) + { + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (latest_urec_ptr[i]) + { + if (i == UNDO_TEMP) + execute_undo_actions(UndoActionStartPtr[i], UndoActionEndPtr[i], + false, true, true); + else + { + uint64 size = latest_urec_ptr[i] - s->start_urec_ptr[i]; + bool result = false; + + /* + * If this is a large rollback request then push it to undo-worker + * through RollbackHT, undo-worker will perform it's undo actions + * later. + */ + if (size >= rollback_overflow_size * 1024 * 1024) + result = PushRollbackReq(UndoActionStartPtr[i], UndoActionEndPtr[i], InvalidOid); + + if (!result) + { + execute_undo_actions(UndoActionStartPtr[i], UndoActionEndPtr[i], + true, true, true); + UndoActionStartPtr[i] = InvalidUndoRecPtr; + } + } + } + } + } + else + PerformUndoActions = true; +} /* * AbortTransaction @@ -2585,6 +2667,7 @@ AbortTransaction(void) */ AfterTriggerEndXact(false); /* 'false' means it's abort */ AtAbort_Portals(); + AtAbort_Rollback(); AtEOXact_LargeObject(false); AtAbort_Notify(); AtEOXact_RelationMap(false, is_parallel_worker); @@ -2794,6 +2877,12 @@ void CommitTransactionCommand(void) { TransactionState s = CurrentTransactionState; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + int i; + + memcpy(start_urec_ptr, s->start_urec_ptr, sizeof(start_urec_ptr)); + memcpy(end_urec_ptr, s->latest_urec_ptr, sizeof(end_urec_ptr)); switch (s->blockState) { @@ -2883,7 +2972,7 @@ CommitTransactionCommand(void) * return to the idle state. */ case TBLOCK_PREPARE: - PrepareTransaction(); + PrepareTransaction(start_urec_ptr, end_urec_ptr); s->blockState = TBLOCK_DEFAULT; break; @@ -2929,6 +3018,23 @@ CommitTransactionCommand(void) { CommitSubTransaction(); s = CurrentTransactionState; /* changed by pop */ + + /* + * Update the end undo record pointer if it's not valid with + * the currently popped transaction's end undo record pointer. + * This is particularly required when the first command of + * the transaction is of type which does not require an undo, + * e.g. savepoint x. + * Accordingly, update the start undo record pointer. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(end_urec_ptr[i])) + end_urec_ptr[i] = s->latest_urec_ptr[i]; + + if (UndoRecPtrIsValid(s->start_urec_ptr[i])) + start_urec_ptr[i] = s->start_urec_ptr[i]; + } } while (s->blockState == TBLOCK_SUBCOMMIT); /* If we had a COMMIT command, finish off the main xact too */ if (s->blockState == TBLOCK_END) @@ -2940,7 +3046,7 @@ CommitTransactionCommand(void) else if (s->blockState == TBLOCK_PREPARE) { Assert(s->parent == NULL); - PrepareTransaction(); + PrepareTransaction(start_urec_ptr, end_urec_ptr); s->blockState = TBLOCK_DEFAULT; } else @@ -3034,6 +3140,18 @@ void AbortCurrentTransaction(void) { TransactionState s = CurrentTransactionState; + int i; + + /* + * The undo actions are allowed to be executed at the end of statement + * execution when we are not in transaction block, otherwise they are + * executed when user explicitly ends the transaction. + * + * So if we are in a transaction block don't set the PerformUndoActions + * because this flag will be set when user explicitly issue rollback or + * rollback to savepoint. + */ + PerformUndoActions = false; switch (s->blockState) { @@ -3068,6 +3186,16 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* + * We are outside the transaction block so remember the required + * information to perform undo actions and also set the + * PerformUndoActions so that we execute it before completing this + * command. + */ + PerformUndoActions = true; + memcpy (UndoActionStartPtr, s->latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy (UndoActionEndPtr, s->start_urec_ptr, sizeof(UndoActionEndPtr)); break; /* @@ -3104,6 +3232,9 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* Failed during commit, so we need to perform the undo actions. */ + PerformUndoActions = true; break; /* @@ -3123,6 +3254,9 @@ AbortCurrentTransaction(void) case TBLOCK_ABORT_END: CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* Failed during commit, so we need to perform the undo actions. */ + PerformUndoActions = true; break; /* @@ -3133,6 +3267,12 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* + * Failed while executing the rollback command, need perform any + * pending undo actions. + */ + PerformUndoActions = true; break; /* @@ -3144,6 +3284,12 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* + * Perform any pending actions if failed while preparing the + * transaction. + */ + PerformUndoActions = true; break; /* @@ -3166,6 +3312,17 @@ AbortCurrentTransaction(void) case TBLOCK_SUBCOMMIT: case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: + /* + * If we are here and still UndoActionStartPtr is valid that means + * the subtransaction failed while executing the undo action, so + * store its undo action start point in parent so that parent can + * start its undo action from this point. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(UndoActionStartPtr[i])) + s->parent->latest_urec_ptr[i] = UndoActionStartPtr[i]; + } AbortSubTransaction(); CleanupSubTransaction(); AbortCurrentTransaction(); @@ -3183,6 +3340,109 @@ AbortCurrentTransaction(void) } /* + * XactPerformUndoActionsIfPending - Execute pending undo actions. + * + * If the parent transaction state is valid (when there is an error in the + * subtransaction and rollback to savepoint is executed), then allow to + * perform undo actions in it, otherwise perform them in a new transaction. + */ +void +XactPerformUndoActionsIfPending() +{ + TransactionState s = CurrentTransactionState; + uint64 rollback_size = 0; + bool new_xact = true, result = false, no_pending_action = true; + UndoRecPtr parent_latest_urec_ptr[UndoPersistenceLevels]; + int i = 0; + + if (!PerformUndoActions) + return; + + /* If there is no undo log for any persistence level, then return. */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(UndoActionStartPtr[i])) + { + no_pending_action = false; + break; + } + } + + if (no_pending_action) + { + PerformUndoActions = false; + return; + } + + /* + * Execute undo actions under parent transaction, if any. Otherwise start + * a new transaction. + */ + if (GetTopTransactionIdIfAny() != InvalidTransactionId) + { + memcpy(parent_latest_urec_ptr, s->latest_urec_ptr, + sizeof (parent_latest_urec_ptr)); + new_xact = false; + } + + /* + * If this is a large rollback request then push it to undo-worker + * through RollbackHT, undo-worker will perform it's undo actions later. + * Never push the rollbacks for temp tables. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(UndoActionStartPtr[i])) + continue; + + if (i == UNDO_TEMP) + goto perform_rollback; + else + rollback_size = UndoActionStartPtr[i] - UndoActionEndPtr[i]; + + if (new_xact && rollback_size > rollback_overflow_size * 1024 * 1024) + result = PushRollbackReq(UndoActionStartPtr[i], UndoActionEndPtr[i], InvalidOid); + + if (!result) + { +perform_rollback: + if (new_xact) + { + TransactionState xact; + + /* Start a new transaction for performing the rollback */ + StartTransactionCommand(); + xact = CurrentTransactionState; + + /* + * Store the previous transactions start and end undo record + * pointers into this transaction's state so that if there is + * some error while performing undo actions we can restart + * from begining. + */ + memcpy(xact->start_urec_ptr, UndoActionEndPtr, + sizeof(UndoActionEndPtr)); + memcpy(xact->latest_urec_ptr, UndoActionStartPtr, + sizeof(UndoActionStartPtr)); + } + + execute_undo_actions(UndoActionStartPtr[i], UndoActionEndPtr[i], + new_xact, true, true); + + if (new_xact) + CommitTransactionCommand(); + else + { + /* Restore parent's state. */ + s->latest_urec_ptr[i] = parent_latest_urec_ptr[i]; + } + } + } + + PerformUndoActions = false; +} + +/* * PreventInTransactionBlock * * This routine is to be called by statements that must not run inside @@ -3583,6 +3843,10 @@ EndTransactionBlock(void) { TransactionState s = CurrentTransactionState; bool result = false; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + int i ; + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); switch (s->blockState) { @@ -3628,6 +3892,16 @@ EndTransactionBlock(void) elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); s = s->parent; + + /* + * We are calculating latest_urec_ptr, even though its a commit + * case. This is to handle any error during the commit path. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(latest_urec_ptr[i])) + latest_urec_ptr[i] = s->latest_urec_ptr[i]; + } } if (s->blockState == TBLOCK_INPROGRESS) s->blockState = TBLOCK_END; @@ -3653,6 +3927,11 @@ EndTransactionBlock(void) elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); s = s->parent; + for (i = 0; i < UndoPersistenceLevels; i++) + { + if(!UndoRecPtrIsValid(latest_urec_ptr[i])) + latest_urec_ptr[i] = s->latest_urec_ptr[i]; + } } if (s->blockState == TBLOCK_INPROGRESS) s->blockState = TBLOCK_ABORT_PENDING; @@ -3705,6 +3984,18 @@ EndTransactionBlock(void) break; } + /* + * We need to perform undo actions if the transaction is failed. Remember + * the required information to perform undo actions at the end of + * statement execution. + */ + if (!result) + PerformUndoActions = true; + + memcpy(UndoActionStartPtr, latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy(UndoActionEndPtr, TopTransactionStateData.start_urec_ptr, + sizeof(UndoActionEndPtr)); + return result; } @@ -3962,6 +4253,12 @@ ReleaseSavepoint(const char *name) TransactionState s = CurrentTransactionState; TransactionState target, xact; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + int i = 0; + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); + memcpy(start_urec_ptr, s->start_urec_ptr, sizeof(start_urec_ptr)); /* * Workers synchronize transaction state at the beginning of each parallel @@ -4055,8 +4352,32 @@ ReleaseSavepoint(const char *name) if (xact == target) break; xact = xact->parent; + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(latest_urec_ptr[i])) + latest_urec_ptr[i] = xact->latest_urec_ptr[i]; + + if (UndoRecPtrIsValid(xact->start_urec_ptr[i])) + start_urec_ptr[i] = xact->start_urec_ptr[i]; + } + Assert(PointerIsValid(xact)); } + + /* + * Before cleaning up the current sub transaction state, overwrite parent + * transaction's latest_urec_ptr with current transaction's latest_urec_ptr + * so that in case parent transaction get aborted we will not skip + * performing undo for this transaction. Also set the start_urec_ptr if + * parent start_urec_ptr is not valid. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(latest_urec_ptr[i])) + xact->parent->latest_urec_ptr[i] = latest_urec_ptr[i]; + if (!UndoRecPtrIsValid(xact->parent->start_urec_ptr[i])) + xact->parent->start_urec_ptr[i] = start_urec_ptr[i]; + } } /* @@ -4267,6 +4588,7 @@ void ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + int i; /* * Workers synchronize transaction state at the beginning of each parallel @@ -4285,6 +4607,22 @@ ReleaseCurrentSubTransaction(void) BlockStateAsString(s->blockState)); Assert(s->state == TRANS_INPROGRESS); MemoryContextSwitchTo(CurTransactionContext); + + /* + * Before cleaning up the current sub transaction state, overwrite parent + * transaction's latest_urec_ptr with current transaction's latest_urec_ptr + * so that in case parent transaction get aborted we will not skip + * performing undo for this transaction. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(s->latest_urec_ptr[i])) + s->parent->latest_urec_ptr[i] = s->latest_urec_ptr[i]; + + if (!UndoRecPtrIsValid(s->parent->start_urec_ptr[i])) + s->parent->start_urec_ptr[i] = s->start_urec_ptr[i]; + } + CommitSubTransaction(); s = CurrentTransactionState; /* changed by pop */ Assert(s->state == TRANS_INPROGRESS); @@ -4301,6 +4639,14 @@ void RollbackAndReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels] = {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels] = {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + UndoRecPtr parent_latest_urec_ptr[UndoPersistenceLevels]; + int i; /* * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted @@ -4347,6 +4693,19 @@ RollbackAndReleaseCurrentSubTransaction(void) if (s->blockState == TBLOCK_SUBINPROGRESS) AbortSubTransaction(); + /* + * Remember the required information to perform undo actions before + * cleaning up the subtransaction state. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(s->latest_urec_ptr[i])) + { + latest_urec_ptr[i] = s->latest_urec_ptr[i]; + start_urec_ptr[i] = s->start_urec_ptr[i]; + } + } + /* And clean it up, too */ CleanupSubTransaction(); @@ -4355,6 +4714,30 @@ RollbackAndReleaseCurrentSubTransaction(void) s->blockState == TBLOCK_INPROGRESS || s->blockState == TBLOCK_IMPLICIT_INPROGRESS || s->blockState == TBLOCK_STARTED); + + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(latest_urec_ptr[i])) + { + parent_latest_urec_ptr[i] = s->latest_urec_ptr[i]; + + /* + * Store the undo action start point in the parent state so that + * we can apply undo actions these undos also during rollback of + * parent transaction in case of error while applying the undo + * actions. + */ + s->latest_urec_ptr[i] = latest_urec_ptr[i]; + execute_undo_actions(latest_urec_ptr[i], start_urec_ptr[i], false, + true, true); + + /* Restore parent state. */ + s->latest_urec_ptr[i] = parent_latest_urec_ptr[i]; + } + } + + /* Successfully performed undo actions so reset the flag. */ + PerformUndoActions = false; } /* @@ -4568,6 +4951,7 @@ static void StartSubTransaction(void) { TransactionState s = CurrentTransactionState; + int i; if (s->state != TRANS_DEFAULT) elog(WARNING, "StartSubTransaction while in %s state", @@ -4585,6 +4969,13 @@ StartSubTransaction(void) AtSubStart_Notify(); AfterTriggerBeginSubXact(); + /* initialize undo record locations for the transaction */ + for(i = 0; i < UndoPersistenceLevels; i++) + { + s->start_urec_ptr[i] = InvalidUndoRecPtr; + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + } + s->state = TRANS_INPROGRESS; /* @@ -4708,6 +5099,47 @@ CommitSubTransaction(void) PopTransaction(); } +static void +AtSubAbort_Rollback(TransactionState s) +{ + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + int i = 0; + + /* XXX: TODO: Check this logic, which was moved out of RollbackToSavepoint() */ + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); + memcpy(start_urec_ptr, s->start_urec_ptr, sizeof(start_urec_ptr)); + + /* + * Remember the required information for performing undo actions. So that + * if there is any failure in executing the undo action we can execute + * it later. + */ + memcpy (UndoActionStartPtr, latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy (UndoActionEndPtr, start_urec_ptr, sizeof(UndoActionEndPtr)); + + /* + * If we are in a valid transaction state then execute the undo action here + * itself, otherwise we have already stored the required information for + * executing the undo action later. + */ + if (s->state == TRANS_INPROGRESS) + { + for ( i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(latest_urec_ptr[i])) + { + execute_undo_actions(latest_urec_ptr[i], start_urec_ptr[i], false, true, false); + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + UndoActionStartPtr[i] = InvalidUndoRecPtr; + } + } + } + else + PerformUndoActions = true; +} + /* * AbortSubTransaction */ @@ -4802,6 +5234,7 @@ AbortSubTransaction(void) s->parent->subTransactionId, s->curTransactionOwner, s->parent->curTransactionOwner); + AtSubAbort_Rollback(s); AtEOSubXact_LargeObject(false, s->subTransactionId, s->parent->subTransactionId); AtSubAbort_Notify(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 01815a6..e575322 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5170,6 +5170,7 @@ BootStrapXLOG(void) checkPoint.newestCommitTsXid = InvalidTransactionId; checkPoint.time = (pg_time_t) time(NULL); checkPoint.oldestActiveXid = InvalidTransactionId; + checkPoint.oldestXidWithEpochHavingUndo = InvalidTransactionId; ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; @@ -6604,6 +6605,10 @@ StartupXLOG(void) (errmsg_internal("commit timestamp Xid oldest/newest: %u/%u", checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid))); + ereport(DEBUG1, + (errmsg_internal("oldest xid with epoch having undo: " UINT64_FORMAT, + checkPoint.oldestXidWithEpochHavingUndo))); + if (!TransactionIdIsNormal(checkPoint.nextXid)) ereport(PANIC, (errmsg("invalid next transaction ID"))); @@ -6621,6 +6626,10 @@ StartupXLOG(void) XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXid = checkPoint.nextXid; + /* Read oldest xid having undo from checkpoint and set in proc global. */ + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); + /* * Initialize replication slots, before there's a chance to remove * required resources. @@ -8792,6 +8801,9 @@ CreateCheckPoint(int flags) checkPoint.nextOid += ShmemVariableCache->oidCount; LWLockRelease(OidGenLock); + checkPoint.oldestXidWithEpochHavingUndo = + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo); + MultiXactGetCheckptMulti(shutdown, &checkPoint.nextMulti, &checkPoint.nextMultiOffset, @@ -9702,6 +9714,9 @@ xlog_redo(XLogReaderState *record) MultiXactAdvanceOldest(checkPoint.oldestMulti, checkPoint.oldestMultiDB); + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); + /* * No need to set oldestClogXid here as well; it'll be set when we * redo an xl_clog_truncate if it changed since initialization. @@ -9760,6 +9775,8 @@ xlog_redo(XLogReaderState *record) /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; + ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo = + checkPoint.oldestXidWithEpochHavingUndo; /* Update shared-memory copy of checkpoint XID/epoch */ SpinLockAcquire(&XLogCtl->info_lck); @@ -9809,6 +9826,9 @@ xlog_redo(XLogReaderState *record) MultiXactAdvanceNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); + /* * NB: This may perform multixact truncation when replaying WAL * generated by an older primary. diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile index f41e8f7..fdf7f7d 100644 --- a/src/backend/access/undo/Makefile +++ b/src/backend/access/undo/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/access/undo top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = undoinsert.o undolog.o undorecord.o +OBJS = undoaction.o undoactionxlog.o undodiscard.o undoinsert.o undolog.o undorecord.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/undo/undoaction.c b/src/backend/access/undo/undoaction.c new file mode 100644 index 0000000..562630e --- /dev/null +++ b/src/backend/access/undo/undoaction.c @@ -0,0 +1,683 @@ +/*------------------------------------------------------------------------- + * + * undoaction.c + * execute undo actions + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undoaction.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "access/undoaction.h" +#include "access/undoaction_xlog.h" +#include "access/undolog.h" +#include "access/undorecord.h" +#include "access/visibilitymap.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "access/xlog_internal.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "postmaster/undoloop.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "utils/rel.h" +#include "utils/relfilenodemap.h" +#include "miscadmin.h" +#include "storage/shmem.h" +#include "access/undodiscard.h" + +#define ROLLBACK_HT_SIZE 1024 + +static bool execute_undo_actions_page(List *luinfo, UndoRecPtr urec_ptr, + Oid reloid, TransactionId xid, BlockNumber blkno, + bool blk_chain_complete, bool norellock, int options); +static void RollbackHTRemoveEntry(UndoRecPtr start_urec_ptr); + +/* This is the hash table to store all the rollabck requests. */ +static HTAB *RollbackHT; + +/* + * execute_undo_actions - Execute the undo actions + * + * from_urecptr - undo record pointer from where to start applying undo action. + * to_urecptr - undo record pointer upto which point apply undo action. + * nopartial - true if rollback is for complete transaction. + * rewind - whether to rewind the insert location of the undo log or not. + * Only the backend executed the transaction can rewind, but + * any other process e.g. undo worker should not rewind it. + * Because, if the backend have already inserted new undo records + * for the next transaction and if we rewind then we will loose + * the undo record inserted for the new transaction. + * rellock - if the caller already has the lock on the required relation, + * then this flag is false, i.e. we do not need to acquire any + * lock here. If the flag is true then we need to acquire lock + * here itself, because caller will not be having any lock. + * When we are performing undo actions for prepared transactions, + * or for rollback to savepoint, we need not to lock as we already + * have the lock on the table. In cases like error or when + * rollbacking from the undo worker we need to have proper locks. + */ +void +execute_undo_actions(UndoRecPtr from_urecptr, UndoRecPtr to_urecptr, + bool nopartial, bool rewind, bool rellock) +{ + UnpackedUndoRecord *uur = NULL; + UndoRecPtr urec_ptr, prev_urec_ptr; + UndoRecPtr save_urec_ptr; + Oid prev_reloid = InvalidOid; + ForkNumber prev_fork = InvalidForkNumber; + BlockNumber prev_block = InvalidBlockNumber; + List *luinfo = NIL; + bool more_undo; + int options = 0; + TransactionId xid = InvalidTransactionId; + UndoRecInfo *urec_info; + + Assert(from_urecptr != InvalidUndoRecPtr); + Assert(UndoRecPtrGetLogNo(from_urecptr) != UndoRecPtrGetLogNo(to_urecptr) || + from_urecptr >= to_urecptr); + /* + * If the location upto which rollback need to be done is not provided, + * then rollback the complete transaction. + * FIXME: this won't work if undolog crossed the limit of 1TB, because + * then from_urecptr and to_urecptr will be from different lognos. + */ + if (to_urecptr == InvalidUndoRecPtr) + { + UndoLogNumber logno = UndoRecPtrGetLogNo(from_urecptr); + to_urecptr = UndoLogGetLastXactStartPoint(logno); + } + + save_urec_ptr = urec_ptr = from_urecptr; + + if (nopartial) + { + uur = UndoFetchRecord(urec_ptr, InvalidBlockNumber, InvalidOffsetNumber, + InvalidTransactionId, NULL, NULL); + if (uur == NULL) + return; + + xid = uur->uur_xid; + UndoRecordRelease(uur); + uur = NULL; + + /* + * Grab the undo action apply lock before start applying the undo action + * this will prevent applying undo actions concurrently. If we do not + * get the lock that mean its already being applied concurrently or the + * discard worker might be pushing its request to the rollback hash + * table + */ + if (!ConditionTransactionUndoActionLock(xid)) + return; + } + + prev_urec_ptr = InvalidUndoRecPtr; + while (prev_urec_ptr != to_urecptr) + { + Oid reloid = InvalidOid; + uint16 urec_prevlen; + bool non_page; + + more_undo = true; + + prev_urec_ptr = urec_ptr; + + /* Fetch the undo record for given undo_recptr. */ + uur = UndoFetchRecord(urec_ptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, NULL, NULL); + + /* If there is no info block, this is not a page-based undo record. */ + non_page = uur && !(uur->uur_info & UREC_INFO_BLOCK); + + if (uur != NULL && !non_page) + reloid = uur->uur_reloid; + + xid = uur->uur_xid; + + if (non_page) + { + prev_reloid = InvalidOid; + urec_prevlen = uur->uur_prevlen; + save_urec_ptr = uur->uur_blkprev; + + /* + * Execute individual undo actions not associated with a page + * immediately. + */ + urec_info = palloc(sizeof(UndoRecInfo)); + urec_info->uur = uur; + urec_info->urp = urec_ptr; + luinfo = lappend(luinfo, urec_info); + execute_undo_actions_page(luinfo, urec_ptr, reloid, xid, + InvalidBlockNumber, false, rellock, + 0); + pfree(urec_info); + urec_info = NULL; + list_free(luinfo); + luinfo = NIL; + UndoRecordRelease(uur); + + /* Follow undo chain until to_urecptr. */ + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + { + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + continue; + } + else + more_undo = false; + } + /* + * If the record is already discarded by undo worker or if the relation + * is dropped or truncated, then we cannot fetch record successfully. + * Hence, skip quietly. + * + * Note: reloid remains InvalidOid for a discarded record. + */ + else if (!OidIsValid(reloid)) + { + /* release the undo records for which action has been replayed */ + while (luinfo) + { + UndoRecInfo *urec_info = (UndoRecInfo *) linitial(luinfo); + + UndoRecordRelease(urec_info->uur); + pfree(urec_info); + luinfo = list_delete_first(luinfo); + } + + urec_prevlen = uur->uur_prevlen; + + /* Release the just-fetched record */ + if (uur != NULL) + UndoRecordRelease(uur); + + /* The undo chain must continue till we reach to_urecptr */ + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + { + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + continue; + } + else + more_undo = false; + } + else if (!OidIsValid(prev_reloid) || + (prev_reloid == reloid && + prev_fork == uur->uur_fork && + prev_block == uur->uur_block)) + { + /* Collect the undo records that belong to the same page. */ + prev_reloid = reloid; + prev_fork = uur->uur_fork; + prev_block = uur->uur_block; + + /* Prepare an undo record information element. */ + urec_info = palloc(sizeof(UndoRecInfo)); + urec_info->urp = urec_ptr; + urec_info->uur = uur; + + luinfo = lappend(luinfo, urec_info); + urec_prevlen = uur->uur_prevlen; + save_urec_ptr = uur->uur_blkprev; + + if (uur->uur_info & UREC_INFO_PAYLOAD_CONTAINS_SLOT) + options |= UNDO_ACTION_UPDATE_TPD; + + /* The undo chain must continue till we reach to_urecptr */ + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + { + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + continue; + } + else + more_undo = false; + } + else + { + more_undo = true; + } + + /* + * If no more undo is left to be processed and we are rolling back the + * complete transaction, then we can consider that the undo chain for a + * block is complete. + * If the previous undo pointer in the page is invalid, then also the + * undo chain for the current block is completed. + */ + if (luinfo && + ((!more_undo && nopartial) || !UndoRecPtrIsValid(save_urec_ptr))) + { + execute_undo_actions_page(luinfo, save_urec_ptr, prev_reloid, + xid, prev_block, true, rellock, options); + /* Done with the page so reset the options. */ + options = 0; + } + else if (luinfo) + { + execute_undo_actions_page(luinfo, save_urec_ptr, prev_reloid, + xid, prev_block, false, rellock, options); + /* Done with the page so reset the options. */ + options = 0; + } + + /* release the undo records for which action has been replayed */ + while (luinfo) + { + UndoRecInfo *urec_info = (UndoRecInfo *) linitial(luinfo); + + UndoRecordRelease(urec_info->uur); + pfree(urec_info); + luinfo = list_delete_first(luinfo); + } + + /* + * There are still more records to process, so keep moving backwards + * in the chain. + */ + if (more_undo) + { + /* Prepare an undo record information element. */ + urec_info = palloc(sizeof(UndoRecInfo)); + urec_info->urp = urec_ptr; + urec_info->uur = uur; + luinfo = lappend(luinfo, urec_info); + + prev_reloid = reloid; + prev_fork = uur->uur_fork; + prev_block = uur->uur_block; + save_urec_ptr = uur->uur_blkprev; + + if (uur->uur_info & UREC_INFO_PAYLOAD_CONTAINS_SLOT) + options |= UNDO_ACTION_UPDATE_TPD; + + /* + * Continue to process the records if this is not the last undo + * record in chain. + */ + urec_prevlen = uur->uur_prevlen; + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + else + break; + } + else + break; + } + + /* Apply the undo actions for the remaining records. */ + if (list_length(luinfo)) + { + execute_undo_actions_page(luinfo, save_urec_ptr, prev_reloid, + xid, prev_block, nopartial ? true : false, + rellock, options); + + /* release the undo records for which action has been replayed */ + while (luinfo) + { + UndoRecInfo *urec_info = (UndoRecInfo *) linitial(luinfo); + + UndoRecordRelease(urec_info->uur); + pfree(urec_info); + luinfo = list_delete_first(luinfo); + } + } + + if (rewind) + { + /* Read the prevlen from the first record of this transaction. */ + uur = UndoFetchRecord(to_urecptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + /* + * If undo is already discarded before we rewind, then do nothing. + */ + if (uur == NULL) + return; + + + /* + * Rewind the insert location to start of this transaction. This is + * to avoid reapplying some intermediate undo. We do not need to wal + * log this information here, because if the system crash before we + * rewind the insert pointer then after recovery we can identify + * whether the undo is already applied or not from the slot undo record + * pointer. Also set the correct prevlen value (what we have fetched + * from the undo). + */ + UndoLogRewind(to_urecptr, uur->uur_prevlen); + + UndoRecordRelease(uur); + } + + if (nopartial) + { + /* + * Set undo action apply completed in the transaction header if this is + * a main transaction and we have not rewound its undo. + */ + if (!rewind) + { + /* + * Undo action is applied so delete the hash table entry and release + * the undo action lock. + */ + RollbackHTRemoveEntry(from_urecptr); + + /* + * Prepare and update the progress of the undo action apply in the + * transaction header. + */ + PrepareUpdateUndoActionProgress(to_urecptr, 1); + + START_CRIT_SECTION(); + + /* Update the progress in the transaction header. */ + UndoRecordUpdateTransInfo(); + + /* WAL log the undo apply progress. */ + { + xl_undoapply_progress xlrec; + + xlrec.urec_ptr = to_urecptr; + xlrec.progress = 1; + + /* + * FIXME : We need to register undo buffers and set LSN for them + * that will be required for FPW of the undo buffers. + */ + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + + (void) XLogInsert(RM_UNDOACTION_ID, XLOG_UNDO_APPLY_PROGRESS); + } + + END_CRIT_SECTION(); + UnlockReleaseUndoBuffers(); + } + + TransactionUndoActionLockRelease(xid); + } +} + +/* + * execute_undo_actions_page - Execute the undo actions for a page + * + * After applying all the undo actions for a page, we clear the transaction + * slot on a page if the undo chain for block is complete, otherwise just + * rewind the undo pointer to the last record for that block that precedes + * the last undo record for which action is replayed. + * + * luinfo - list of undo records (along with their location) for which undo + * action needs to be replayed. + * urec_ptr - undo record pointer to which we need to rewind. + * reloid - OID of relation on which undo actions needs to be applied. + * blkno - block number on which undo actions needs to be applied. + * blk_chain_complete - indicates whether the undo chain for block is + * complete. + * nopartial - true if rollback is for complete transaction. If we are not + * rolling back the complete transaction then we need to apply the + * undo action for UNDO_INVALID_XACT_SLOT also because in such + * case we will rewind the insert undo location. + * rellock - if the caller already has the lock on the required relation, + * then this flag is false, i.e. we do not need to acquire any + * lock here. If the flag is true then we need to acquire lock + * here itself, because caller will not be having any lock. + * When we are performing undo actions for prepared transactions, + * or for rollback to savepoint, we need not to lock as we already + * have the lock on the table. In cases like error or when + * rollbacking from the undo worker we need to have proper locks. + * options - options for executing undo actions. + * + * returns true, if successfully applied the undo actions, otherwise, false. + */ +static bool +execute_undo_actions_page(List *luinfo, UndoRecPtr urec_ptr, Oid reloid, + TransactionId xid, BlockNumber blkno, + bool blk_chain_complete, bool rellock, int options) +{ + UndoRecInfo *first; + + /* + * All records passed to us are for the same RMGR, so we just use the + * first record to dispatch. + */ + Assert(luinfo != NIL); + first = (UndoRecInfo *) linitial(luinfo); + + return RmgrTable[first->uur->uur_rmid].rm_undo(luinfo, urec_ptr, reloid, + xid, blkno, + blk_chain_complete, rellock, + options); +} + +/* + * To return the size of the hash-table for rollbacks. + */ +int +RollbackHTSize(void) +{ + return hash_estimate_size(ROLLBACK_HT_SIZE, sizeof(RollbackHashEntry)); +} + +/* + * To initialize the hash-table for rollbacks in shared memory + * for the given size. + */ +void +InitRollbackHashTable(void) +{ + HASHCTL info; + MemSet(&info, 0, sizeof(info)); + + info.keysize = sizeof(UndoRecPtr); + info.entrysize = sizeof(RollbackHashEntry); + info.hash = tag_hash; + + RollbackHT = ShmemInitHash("Undo actions Lookup Table", + ROLLBACK_HT_SIZE, ROLLBACK_HT_SIZE, &info, + HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); +} + +/* + * To push the rollback requests from backend to the hash-table. + * Return true if the request is successfully added, else false + * and the caller may execute undo actions itself. + */ +bool +PushRollbackReq(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr, Oid dbid) +{ + bool found = false; + RollbackHashEntry *rh; + + Assert(UndoRecPtrGetLogNo(start_urec_ptr) != UndoRecPtrGetLogNo(end_urec_ptr) || + start_urec_ptr >= end_urec_ptr); + /* + * If the location upto which rollback need to be done is not provided, + * then rollback the complete transaction. + */ + if (start_urec_ptr == InvalidUndoRecPtr) + { + UndoLogNumber logno = UndoRecPtrGetLogNo(end_urec_ptr); + start_urec_ptr = UndoLogGetLastXactStartPoint(logno); + } + + Assert(UndoRecPtrIsValid(start_urec_ptr)); + + /* If there is no space to accomodate new request, then we can't proceed. */ + if (RollbackHTIsFull()) + return false; + + if(!UndoRecPtrIsValid(end_urec_ptr)) + { + UndoLogNumber logno = UndoRecPtrGetLogNo(start_urec_ptr); + end_urec_ptr = UndoLogGetLastXactStartPoint(logno); + } + + LWLockAcquire(RollbackHTLock, LW_EXCLUSIVE); + + rh = (RollbackHashEntry *) hash_search(RollbackHT, &start_urec_ptr, + HASH_ENTER_NULL, &found); + if (!rh) + { + LWLockRelease(RollbackHTLock); + return false; + } + /* We shouldn't try to push the same rollback request again. */ + if (!found) + { + rh->start_urec_ptr = start_urec_ptr; + rh->end_urec_ptr = end_urec_ptr; + rh->dbid = (dbid == InvalidOid) ? MyDatabaseId : dbid; + } + LWLockRelease(RollbackHTLock); + + return true; +} + +/* + * To perform the undo actions for the transactions whose rollback + * requests are in hash table. Sequentially, scan the hash-table + * and perform the undo-actions for the respective transactions. + * Once, the undo-actions are applied, remove the entry from the + * hash table. + */ +void +RollbackFromHT(Oid dbid) +{ + UndoRecPtr start[ROLLBACK_HT_SIZE]; + UndoRecPtr end[ROLLBACK_HT_SIZE]; + RollbackHashEntry *rh; + HASH_SEQ_STATUS status; + int i = 0; + + /* Fetch the rollback requests */ + LWLockAcquire(RollbackHTLock, LW_SHARED); + + Assert(hash_get_num_entries(RollbackHT) <= ROLLBACK_HT_SIZE); + hash_seq_init(&status, RollbackHT); + while (RollbackHT != NULL && + (rh = (RollbackHashEntry *) hash_seq_search(&status)) != NULL) + { + if (rh->dbid == dbid) + { + start[i] = rh->start_urec_ptr; + end[i] = rh->end_urec_ptr; + i++; + } + } + + LWLockRelease(RollbackHTLock); + + /* Execute the rollback requests */ + while(--i >= 0) + { + Assert(UndoRecPtrIsValid(start[i])); + Assert(UndoRecPtrIsValid(end[i])); + + StartTransactionCommand(); + execute_undo_actions(start[i], end[i], true, false, true); + CommitTransactionCommand(); + } +} + +/* + * Remove the rollback request entry from the rollback hash table. + */ +static void +RollbackHTRemoveEntry(UndoRecPtr start_urec_ptr) +{ + LWLockAcquire(RollbackHTLock, LW_EXCLUSIVE); + + hash_search(RollbackHT, &start_urec_ptr, HASH_REMOVE, NULL); + + LWLockRelease(RollbackHTLock); +} + +/* + * To check if the rollback requests in the hash table are all + * completed or not. This is required because we don't not want to + * expose RollbackHT in xact.c, where it is required to ensure + * that we push the resuests only when there is some space in + * the hash-table. + */ +bool +RollbackHTIsFull(void) +{ + bool result = false; + + LWLockAcquire(RollbackHTLock, LW_SHARED); + + if (hash_get_num_entries(RollbackHT) >= ROLLBACK_HT_SIZE) + result = true; + + LWLockRelease(RollbackHTLock); + + return result; +} + +/* + * Get database list from the rollback hash table. + */ +List * +RollbackHTGetDBList() +{ + HASH_SEQ_STATUS status; + RollbackHashEntry *rh; + List *dblist = NIL; + + /* Fetch the rollback requests */ + LWLockAcquire(RollbackHTLock, LW_SHARED); + + hash_seq_init(&status, RollbackHT); + while (RollbackHT != NULL && + (rh = (RollbackHashEntry *) hash_seq_search(&status)) != NULL) + dblist = list_append_unique_oid(dblist, rh->dbid); + + LWLockRelease(RollbackHTLock); + + return dblist; +} + +/* + * ConditionTransactionUndoActionLock + * + * Insert a lock showing that the undo action for given transaction is in + * progress. This is only done for the main transaction not for the + * sub-transaction. + */ +bool +ConditionTransactionUndoActionLock(TransactionId xid) +{ + LOCKTAG tag; + + SET_LOCKTAG_TRANSACTION_UNDOACTION(tag, xid); + + if (LOCKACQUIRE_NOT_AVAIL == LockAcquire(&tag, ExclusiveLock, false, true)) + return false; + else + return true; +} + +/* + * TransactionUndoActionLockRelease + * + * Delete the lock showing that the undo action given transaction ID is in + * progress. + */ +void +TransactionUndoActionLockRelease(TransactionId xid) +{ + LOCKTAG tag; + + SET_LOCKTAG_TRANSACTION_UNDOACTION(tag, xid); + + LockRelease(&tag, ExclusiveLock, false); +} diff --git a/src/backend/access/undo/undoactionxlog.c b/src/backend/access/undo/undoactionxlog.c new file mode 100644 index 0000000..546f43d --- /dev/null +++ b/src/backend/access/undo/undoactionxlog.c @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * undoactionxlog.c + * WAL replay logic for undo actions. + * + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/undo/undoactionxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" +#include "access/undoinsert.h" +#include "access/visibilitymap.h" +#include "access/xlog.h" +#include "access/xlogutils.h" + +/* + * Replay of undo apply progress. + */ +static void +undo_xlog_apply_progress(XLogReaderState *record) +{ + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) XLogRecGetData(record); + + /* Update the progress in the transaction header. */ + PrepareUpdateUndoActionProgress(xlrec->urec_ptr, xlrec->progress); + UndoRecordUpdateTransInfo(); +} + +void +undoaction_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_UNDO_APPLY_PROGRESS: + undo_xlog_apply_progress(record); + break; + default: + elog(PANIC, "undoaction_redo: unknown op code %u", info); + } +} diff --git a/src/backend/access/undo/undodiscard.c b/src/backend/access/undo/undodiscard.c new file mode 100644 index 0000000..9d55588 --- /dev/null +++ b/src/backend/access/undo/undodiscard.c @@ -0,0 +1,458 @@ +/*------------------------------------------------------------------------- + * + * undodiscard.c + * discard undo records + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undodiscard.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "access/xlog.h" +#include "access/xact.h" +#include "access/undolog.h" +#include "access/undodiscard.h" +#include "catalog/pg_tablespace.h" +#include "miscadmin.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/proc.h" +#include "utils/resowner.h" +#include "postmaster/undoloop.h" + +static UndoRecPtr FetchLatestUndoPtrForXid(UndoRecPtr urecptr, + UnpackedUndoRecord *uur_start, + UndoLogControl *log); + +/* + * Discard the undo for the log + * + * Search the undo log, get the start record for each transaction until we get + * the transaction with xid >= xmin or an invalid xid. Then call undolog + * routine to discard upto that point and update the memory structure for the + * log slot. We set the hibernate flag if we do not have any undo logs, this + * flag is passed to the undo worker wherein it determines if system is idle + * and it should sleep for sometime. + * + * Return the oldest xid remaining in this undo log (which should be >= xmin, + * since we'll discard everything older). Return InvalidTransactionId if the + * undo log is empty. + */ +static TransactionId +UndoDiscardOneLog(UndoLogControl *log, TransactionId xmin, bool *hibernate) +{ + UndoRecPtr undo_recptr, next_insert, from_urecptr; + UndoRecPtr next_urecptr = InvalidUndoRecPtr; + UnpackedUndoRecord *uur = NULL; + bool need_discard = false; + bool log_complete = false; + TransactionId undoxid = InvalidTransactionId; + TransactionId xid = log->oldest_xid; + TransactionId latest_discardxid = InvalidTransactionId; + uint32 epoch = 0; + + undo_recptr = log->oldest_data; + + /* There might not be any undo log and hibernation might be needed. */ + *hibernate = true; + + /* Loop until we run out of discardable transactions. */ + do + { + bool pending_abort = false; + + next_insert = UndoLogGetNextInsertPtr(log->logno, xid); + + /* + * If the next insert location in the undo log is same as the oldest + * data for the log then there is nothing more to discard in this log + * so discard upto this point. + */ + if (next_insert == undo_recptr) + { + /* + * If the discard location and the insert location is same then + * there is nothing to discard. + */ + if (undo_recptr == log->oldest_data) + break; + else + log_complete = true; + } + else + { + /* Fetch the undo record for given undo_recptr. */ + uur = UndoFetchRecord(undo_recptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + + Assert(uur != NULL); + + if (!TransactionIdDidCommit(uur->uur_xid) && + TransactionIdPrecedes(uur->uur_xid, xmin) && + uur->uur_progress == 0) + { + /* + * At the time of recovery, we might not have a valid next undo + * record pointer and in that case we'll calculate the location + * of from pointer using the last record of next insert + * location. + */ + if (ConditionTransactionUndoActionLock(uur->uur_xid)) + { + TransactionId xid = uur->uur_xid; + UndoLogControl *log = NULL; + UndoLogNumber logno; + + logno = UndoRecPtrGetLogNo(undo_recptr); + log = UndoLogGet(logno, false); + + /* + * If the corresponding log got rewinded to a location + * prior to undo_recptr, the undo actions are already + * applied. + */ + if (log->meta.insert > undo_recptr) + { + UndoRecordRelease(uur); + + /* Fetch the undo record under undo action lock. */ + uur = UndoFetchRecord(undo_recptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + /* + * If the undo actions for the aborted transaction is + * already applied then continue discarding the undo log + * otherwise discard till current point and stop processing + * this undo log. + * Also, check this is indeed the transaction id we're + * looking for. It is possible that after rewinding + * some other transaction has inserted an undo record. + */ + if (uur->uur_xid == xid && uur->uur_progress == 0) + { + from_urecptr = FetchLatestUndoPtrForXid(undo_recptr, uur, log); + (void)PushRollbackReq(from_urecptr, undo_recptr, uur->uur_dbid); + pending_abort = true; + } + } + + TransactionUndoActionLockRelease(xid); + } + else + pending_abort = true; + } + + next_urecptr = uur->uur_next; + undoxid = uur->uur_xid; + xid = undoxid; + epoch = uur->uur_xidepoch; + } + + /* we can discard upto this point. */ + if (TransactionIdFollowsOrEquals(undoxid, xmin) || + next_urecptr == SpecialUndoRecPtr || + UndoRecPtrGetLogNo(next_urecptr) != log->logno || + log_complete || pending_abort) + { + /* Hey, I got some undo log to discard, can not hibernate now. */ + *hibernate = false; + + if (uur != NULL) + UndoRecordRelease(uur); + + /* + * If Transaction id is smaller than the xmin that means this must + * be the last transaction in this undo log, so we need to get the + * last insert point in this undo log and discard till that point. + * Also, if the transaction has pending abort, we stop discarding + * undo from the same location. + */ + if (TransactionIdPrecedes(undoxid, xmin) && !pending_abort) + { + UndoRecPtr next_insert = InvalidUndoRecPtr; + + /* + * Get the last insert location for this transaction Id, if it + * returns invalid pointer that means there is new transaction + * has started for this undolog. So we need to refetch the undo + * and continue the process. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, undoxid); + if (!UndoRecPtrIsValid(next_insert)) + continue; + + undo_recptr = next_insert; + need_discard = true; + epoch = 0; + latest_discardxid = undoxid; + undoxid = InvalidTransactionId; + } + + LWLockAcquire(&log->discard_lock, LW_EXCLUSIVE); + + /* + * If no more pending undo logs then set the oldest transaction to + * InvalidTransactionId. + */ + if (log_complete) + { + log->oldest_xid = InvalidTransactionId; + log->oldest_xidepoch = 0; + } + else + { + log->oldest_xid = undoxid; + log->oldest_xidepoch = epoch; + } + + log->oldest_data = undo_recptr; + LWLockRelease(&log->discard_lock); + + if (need_discard) + UndoLogDiscard(undo_recptr, latest_discardxid); + + break; + } + + /* + * This transaction is smaller than the xmin so lets jump to the next + * transaction. + */ + undo_recptr = next_urecptr; + latest_discardxid = undoxid; + + if(uur != NULL) + { + UndoRecordRelease(uur); + uur = NULL; + } + + need_discard = true; + } while (true); + + return undoxid; +} + +/* + * Discard the undo for all the transaction whose xid is smaller than xmin + * + * Check the DiscardInfo memory array for each slot (every undo log) , process + * the undo log for all the slot which have xid smaller than xmin or invalid + * xid. Fetch the record from the undo log transaction by transaction until we + * find the xid which is not smaller than xmin. + */ +void +UndoDiscard(TransactionId oldestXmin, bool *hibernate) +{ + TransactionId oldestXidHavingUndo = oldestXmin; + uint64 epoch = GetEpochForXid(oldestXmin); + UndoLogControl *log = NULL; + + /* + * TODO: Ideally we'd arrange undo logs so that we can efficiently find + * those with oldest_xid < oldestXmin, but for now we'll just scan all of + * them. + */ + while ((log = UndoLogNext(log))) + { + TransactionId oldest_xid = InvalidTransactionId; + + /* We can't process temporary undo logs. */ + if (log->meta.persistence == UNDO_TEMP) + continue; + + /* + * If the first xid of the undo log is smaller than the xmin the try + * to discard the undo log. + */ + if (TransactionIdPrecedes(log->oldest_xid, oldestXmin)) + { + /* + * If the XID in the discard entry is invalid then start scanning + * from the first valid undorecord in the log. + */ + if (!TransactionIdIsValid(log->oldest_xid)) + { + bool full = false; + UndoRecPtr urp = UndoLogGetFirstValidRecord(log, &full); + + if (!UndoRecPtrIsValid(urp)) + { + /* + * There is nothing to be discarded. If there is also no + * more free space, then a call to UndoLogDiscard() will + * discard it the undo log completely and free up the + * UndoLogControl slot. + */ + if (full) + UndoLogDiscard(MakeUndoRecPtr(log->meta.logno, + log->meta.discard), + InvalidTransactionId); + continue; + } + + LWLockAcquire(&log->discard_lock, LW_SHARED); + log->oldest_data = urp; + LWLockRelease(&log->discard_lock); + } + + /* Process the undo log. */ + oldest_xid = UndoDiscardOneLog(log, oldestXmin, hibernate); + } + + if (TransactionIdIsValid(oldest_xid) && + TransactionIdPrecedes(oldest_xid, oldestXidHavingUndo)) + { + oldestXidHavingUndo = oldest_xid; + epoch = GetEpochForXid(oldest_xid); + } + } + + /* + * Update the oldestXidWithEpochHavingUndo in the shared memory. + * + * XXX In future if multiple worker can perform discard then we may need + * to use compare and swap for updating the shared memory value. + */ + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + MakeEpochXid(epoch, oldestXidHavingUndo)); +} + +/* + * Fetch the latest urec pointer for the transaction. + */ +UndoRecPtr +FetchLatestUndoPtrForXid(UndoRecPtr urecptr, UnpackedUndoRecord *uur_start, + UndoLogControl *log) +{ + UndoRecPtr next_urecptr, from_urecptr; + uint16 prevlen; + UndoLogOffset next_insert; + UnpackedUndoRecord *uur; + bool refetch = false; + + uur = uur_start; + + while (true) + { + /* fetch the undo record again if required. */ + if (refetch) + { + uur = UndoFetchRecord(urecptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + refetch = false; + } + + next_urecptr = uur->uur_next; + prevlen = UndoLogGetPrevLen(log->logno); + + /* + * If this is the last transaction in the log then calculate the latest + * urec pointer using next insert location of the undo log. Otherwise, + * calculate using next transaction's start pointer. + */ + if (uur->uur_next == SpecialUndoRecPtr) + { + /* + * While fetching the next insert location if the new transaction + * has already started in this log then lets re-fetch the undo + * record. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, uur->uur_xid); + if (!UndoRecPtrIsValid(next_insert)) + { + if (uur != uur_start) + UndoRecordRelease(uur); + refetch = true; + continue; + } + + from_urecptr = UndoGetPrevUndoRecptr(next_insert, prevlen); + break; + } + else if ((UndoRecPtrGetLogNo(next_urecptr) != log->logno) && + UndoLogIsDiscarded(next_urecptr)) + { + /* + * If next_urecptr is in different undolog and its already discarded + * that means the undo actions for this transaction which are in the + * next log has already been executed and we only need to execute + * which are remaining in this log. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, uur->uur_xid); + + Assert(UndoRecPtrIsValid(next_insert)); + from_urecptr = UndoGetPrevUndoRecptr(next_insert, prevlen); + break; + } + else + { + UnpackedUndoRecord *next_uur; + + next_uur = UndoFetchRecord(next_urecptr, + InvalidBlockNumber, + InvalidOffsetNumber, + InvalidTransactionId, + NULL, NULL); + /* + * If the next_urecptr is in the same log then calculate the + * from pointer using prevlen. + */ + if (UndoRecPtrGetLogNo(next_urecptr) == log->logno) + { + from_urecptr = + UndoGetPrevUndoRecptr(next_urecptr, next_uur->uur_prevlen); + UndoRecordRelease(next_uur); + break; + } + else + { + /* + * The transaction is overflowed to the next log, so restart + * the processing from then next log. + */ + log = UndoLogGet(UndoRecPtrGetLogNo(next_urecptr), false); + if (uur != uur_start) + UndoRecordRelease(uur); + uur = next_uur; + continue; + } + + UndoRecordRelease(next_uur); + } + } + + if (uur != uur_start) + UndoRecordRelease(uur); + + return from_urecptr; +} + +/* + * Discard the undo logs for temp tables. + */ +void +TempUndoDiscard(UndoLogNumber logno) +{ + UndoLogControl *log = UndoLogGet(logno, false); + + /* + * Discard the undo log for temp table only. Ensure that there is + * something to be discarded there. + */ + Assert (log->meta.persistence == UNDO_TEMP); + + /* Process the undo log. */ + UndoLogDiscard(MakeUndoRecPtr(log->logno, log->meta.insert), + InvalidTransactionId); +} diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c index 7c7e4ff..16016b8 100644 --- a/src/backend/access/undo/undoinsert.c +++ b/src/backend/access/undo/undoinsert.c @@ -150,7 +150,6 @@ static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec, UndoPersistence persistence); static void UndoRecordPrepareTransInfo(UndoRecPtr urecptr, bool log_switched); -static void UndoRecordUpdateTransInfo(void); static int UndoGetBufferSlot(RelFileNode rnode, BlockNumber blk, ReadBufferMode rbm, UndoPersistence persistence); @@ -293,12 +292,59 @@ UndoRecordPrepareTransInfo(UndoRecPtr urecptr, bool log_switched) } /* + * Update the progress of the undo record in the transaction header. + */ +void +PrepareUpdateUndoActionProgress(UndoRecPtr urecptr, int progress) +{ + Buffer buffer = InvalidBuffer; + BlockNumber cur_blk; + RelFileNode rnode; + UndoLogNumber logno = UndoRecPtrGetLogNo(urecptr); + UndoLogControl *log; + Page page; + int already_decoded = 0; + int starting_byte; + int bufidx; + int index = 0; + + log = UndoLogGet(logno, false); + + if (log->meta.persistence == UNDO_TEMP) + return; + + UndoRecPtrAssignRelFileNode(rnode, urecptr); + cur_blk = UndoRecPtrGetBlockNum(urecptr); + starting_byte = UndoRecPtrGetPageOffset(urecptr); + + while (true) + { + bufidx = UndoGetBufferSlot(rnode, cur_blk, + RBM_NORMAL, + log->meta.persistence); + xact_urec_info.idx_undo_buffers[index++] = bufidx; + buffer = undo_buffer[bufidx].buf; + page = BufferGetPage(buffer); + + if (UnpackUndoRecord(&xact_urec_info.uur, page, starting_byte, + &already_decoded, true)) + break; + + starting_byte = UndoLogBlockHeaderSize; + cur_blk++; + } + + xact_urec_info.urecptr = urecptr; + xact_urec_info.uur.uur_progress = progress; +} + +/* * Overwrite the first undo record of the previous transaction to update its * next pointer. This will just insert the already prepared record by * UndoRecordPrepareTransInfo. This must be called under the critical section. * This will just overwrite the undo header not the data. */ -static void +void UndoRecordUpdateTransInfo(void) { UndoLogNumber logno = UndoRecPtrGetLogNo(xact_urec_info.urecptr); diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c index 73076dc..3cd4853 100644 --- a/src/backend/access/undo/undorecord.c +++ b/src/backend/access/undo/undorecord.c @@ -90,6 +90,7 @@ InsertUndoRecord(UnpackedUndoRecord *uur, Page page, */ if (*already_written == 0) { + work_hdr.urec_rmid = uur->uur_rmid; work_hdr.urec_type = uur->uur_type; work_hdr.urec_info = uur->uur_info; work_hdr.urec_prevlen = uur->uur_prevlen; @@ -114,6 +115,7 @@ InsertUndoRecord(UnpackedUndoRecord *uur, Page page, * We should have been passed the same record descriptor as before, or * caller has messed up. */ + Assert(work_hdr.urec_rmid == uur->uur_rmid); Assert(work_hdr.urec_type == uur->uur_type); Assert(work_hdr.urec_info == uur->uur_info); Assert(work_hdr.urec_prevlen == uur->uur_prevlen); @@ -276,6 +278,7 @@ UnpackUndoRecord(UnpackedUndoRecord *uur, Page page, int starting_byte, &my_bytes_decoded, already_decoded, false)) return false; + uur->uur_rmid = work_hdr.urec_rmid; uur->uur_type = work_hdr.urec_type; uur->uur_info = work_hdr.urec_info; uur->uur_prevlen = work_hdr.urec_prevlen; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 25b3b03..8d98d3f 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -1381,6 +1381,7 @@ vac_truncate_clog(TransactionId frozenXID, MultiXactId lastSaneMinMulti) { TransactionId nextXID = ReadNewTransactionId(); + TransactionId oldestXidHavingUndo; Relation relation; HeapScanDesc scan; HeapTuple tuple; @@ -1475,6 +1476,16 @@ vac_truncate_clog(TransactionId frozenXID, return; /* + * We can't truncate the clog for transactions that still have undo. The + * oldestXidHavingUndo will be only valid for zheap storage engine, so it + * won't impact any other storage engine. + */ + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + if (TransactionIdIsValid(oldestXidHavingUndo)) + frozenXID = Min(frozenXID, oldestXidHavingUndo); + + /* * Advance the oldest value for commit timestamps before truncating, so * that if a user requests a timestamp for a transaction we're truncating * away right after this point, they get NULL instead of an ugly "file not diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c2321..9ce6ff0 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o +OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o discardworker.o fork_process.o \ + pgarch.o pgstat.o postmaster.o startup.o syslogger.o undoworker.o walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index d2b695e..49df516 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -20,7 +20,9 @@ #include "pgstat.h" #include "port/atomics.h" #include "postmaster/bgworker_internals.h" +#include "postmaster/discardworker.h" #include "postmaster/postmaster.h" +#include "postmaster/undoworker.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "storage/dsm.h" @@ -129,6 +131,15 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "UndoLauncherMain", UndoLauncherMain + }, + { + "UndoWorkerMain", UndoWorkerMain + }, + { + "DiscardWorkerMain", DiscardWorkerMain } }; diff --git a/src/backend/postmaster/discardworker.c b/src/backend/postmaster/discardworker.c new file mode 100644 index 0000000..e4c6719 --- /dev/null +++ b/src/backend/postmaster/discardworker.c @@ -0,0 +1,170 @@ +/*------------------------------------------------------------------------- + * + * discardworker.c + * The undo discard worker for asynchronous undo management. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/postmaster/discardworker.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include + +/* These are always necessary for a bgworker. */ +#include "access/transam.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/shmem.h" + +#include "access/undodiscard.h" +#include "pgstat.h" +#include "postmaster/discardworker.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/guc.h" +#include "utils/resowner.h" + +static void undoworker_sigterm_handler(SIGNAL_ARGS); + +/* max sleep time between cycles (100 milliseconds) */ +#define MIN_NAPTIME_PER_CYCLE 100L +#define DELAYED_NAPTIME 10 * MIN_NAPTIME_PER_CYCLE +#define MAX_NAPTIME_PER_CYCLE 100 * MIN_NAPTIME_PER_CYCLE + +static bool got_SIGTERM = false; +static bool hibernate = false; +static long wait_time = MIN_NAPTIME_PER_CYCLE; + +/* SIGTERM: set flag to exit at next convenient time */ +static void +undoworker_sigterm_handler(SIGNAL_ARGS) +{ + got_SIGTERM = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); +} + +/* + * DiscardWorkerRegister -- Register a undo discard worker. + */ +void +DiscardWorkerRegister(void) +{ + BackgroundWorker bgw; + + /* TODO: This should be configurable. */ + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_name, BGW_MAXLEN, "discard worker"); + sprintf(bgw.bgw_library_name, "postgres"); + sprintf(bgw.bgw_function_name, "DiscardWorkerMain"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/* + * DiscardWorkerMain -- Main loop for the undo discard worker. + */ +void +DiscardWorkerMain(Datum main_arg) +{ + ereport(LOG, + (errmsg("discard worker started"))); + + /* Establish signal handlers. */ + pqsignal(SIGTERM, undoworker_sigterm_handler); + BackgroundWorkerUnblockSignals(); + + /* Make it easy to identify our processes. */ + SetConfigOption("application_name", MyBgworkerEntry->bgw_name, + PGC_USERSET, PGC_S_SESSION); + + /* + * Create resource owner for discard worker as it need to read the undo + * records outside the transaction blocks which intern access buffer read + * routine. + */ + CreateAuxProcessResourceOwner(); + + /* Enter main loop */ + while (!got_SIGTERM) + { + int rc; + TransactionId OldestXmin, oldestXidHavingUndo; + + OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT); + + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + + /* + * Call the discard routine if there oldestXidHavingUndo is lagging + * behind OldestXmin. + */ + if (OldestXmin != InvalidTransactionId && + TransactionIdPrecedes(oldestXidHavingUndo, OldestXmin)) + { + UndoDiscard(OldestXmin, &hibernate); + + /* + * If we got some undo logs to discard or discarded something, + * then reset the wait_time as we have got work to do. + * Note that if there are some undologs that cannot be discarded, + * then above condition will remain unsatisified till oldestXmin + * remains unchanged and the wait_time will not reset in that case. + */ + if (!hibernate) + wait_time = MIN_NAPTIME_PER_CYCLE; + } + + /* Wait for more work. */ + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wait_time, + WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN); + + ResetLatch(&MyProc->procLatch); + + /* + * Increase the wait_time based on the length of inactivity. If wait_time + * is within one second, then increment it by 100 ms at a time. Henceforth, + * increment it one second at a time, till it reaches ten seconds. Never + * increase the wait_time more than ten seconds, it will be too much of + * waiting otherwise. + */ + if (rc & WL_TIMEOUT && hibernate) + { + wait_time += (wait_time < DELAYED_NAPTIME ? + MIN_NAPTIME_PER_CYCLE : DELAYED_NAPTIME); + if (wait_time > MAX_NAPTIME_PER_CYCLE) + wait_time = MAX_NAPTIME_PER_CYCLE; + } + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } + + ReleaseAuxProcessResources(true); + + /* we're done */ + ereport(LOG, + (errmsg("discard worker shutting down"))); + + proc_exit(0); +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 9d717d9..81f0698 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3506,6 +3506,12 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_SYSLOGGER_MAIN: event_name = "SysLoggerMain"; break; + case WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN: + event_name = "UndoDiscardWorkerMain"; + break; + case WAIT_EVENT_UNDO_LAUNCHER_MAIN: + event_name = "UndoLauncherMain"; + break; case WAIT_EVENT_WAL_RECEIVER_MAIN: event_name = "WalReceiverMain"; break; @@ -3918,7 +3924,6 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_UNDO_FILE_SYNC: event_name = "UndoFileSync"; break; - case WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ: event_name = "WALSenderTimelineHistoryRead"; break; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index a33a131..9954b21 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -111,10 +111,12 @@ #include "port/pg_bswap.h" #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" +#include "postmaster/discardworker.h" #include "postmaster/fork_process.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" +#include "postmaster/undoworker.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -991,6 +993,11 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + UndoLauncherRegister(); + + /* Register the Undo Discard worker. */ + DiscardWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ diff --git a/src/backend/postmaster/undoworker.c b/src/backend/postmaster/undoworker.c new file mode 100644 index 0000000..bf72203 --- /dev/null +++ b/src/backend/postmaster/undoworker.c @@ -0,0 +1,664 @@ +/*------------------------------------------------------------------------- + * + * undoworker.c + * undo launcher and undo worker process. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/postmaster/undoworker.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/heapam.h" +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/sysattr.h" +#include "access/xact.h" + +#include "catalog/indexing.h" +#include "catalog/pg_database.h" + +#include "libpq/pqsignal.h" + +#include "postmaster/bgworker.h" +#include "postmaster/fork_process.h" +#include "postmaster/postmaster.h" +#include "postmaster/undoloop.h" +#include "postmaster/undoworker.h" + +#include "replication/slot.h" +#include "replication/worker_internal.h" + +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" + +#include "tcop/tcopprot.h" + +#include "utils/fmgroids.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* max sleep time between cycles (100 milliseconds) */ +#define DEFAULT_NAPTIME_PER_CYCLE 100L +#define DEFAULT_RETRY_NAPTIME 50L + +int max_undo_workers = 5; + +typedef struct UndoApplyWorker +{ + /* Indicates if this slot is used or free. */ + bool in_use; + + /* Increased everytime the slot is taken by new worker. */ + uint16 generation; + + /* Pointer to proc array. NULL if not running. */ + PGPROC *proc; + + /* Database id to connect to. */ + Oid dbid; +} UndoApplyWorker; + +UndoApplyWorker *MyUndoWorker = NULL; + +typedef struct UndoApplyCtxStruct +{ + /* Supervisor process. */ + pid_t launcher_pid; + + /* Background workers. */ + UndoApplyWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} UndoApplyCtxStruct; + +UndoApplyCtxStruct *UndoApplyCtx; + +static void undo_worker_onexit(int code, Datum arg); +static void undo_worker_cleanup(UndoApplyWorker *worker); + +static volatile sig_atomic_t got_SIGHUP = false; + +/* + * Wait for a background worker to start up and attach to the shmem context. + * + * This is only needed for cleaning up the shared memory in case the worker + * fails to attach. + */ +static void +WaitForUndoWorkerAttach(UndoApplyWorker *worker, + uint16 generation, + BackgroundWorkerHandle *handle) +{ + BgwHandleStatus status; + int rc; + + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(UndoWorkerLock, LW_SHARED); + + /* Worker either died or has started; no need to do anything. */ + if (!worker->in_use || worker->proc) + { + LWLockRelease(UndoWorkerLock); + return; + } + + LWLockRelease(UndoWorkerLock); + + /* Check if worker has died before attaching, and clean up after it. */ + status = GetBackgroundWorkerPid(handle, &pid); + + if (status == BGWH_STOPPED) + { + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + /* Ensure that this was indeed the worker we waited for. */ + if (generation == worker->generation) + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + return; + } + + /* + * We need timeout because we generally don't get notified via latch + * about the worker attach. But we don't expect to have to wait long. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 10L, WAIT_EVENT_BGWORKER_STARTUP); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } + + return; +} + +/* + * Get dbid from the worker slot. + */ +static Oid +slot_get_dbid(int slot) +{ + Oid dbid; + + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + MyUndoWorker = &UndoApplyCtx->workers[slot]; + + if (!MyUndoWorker->in_use) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is empty,", + slot))); + } + + dbid = MyUndoWorker->dbid; + + LWLockRelease(UndoWorkerLock); + + return dbid; +} + +/* + * Attach to a slot. + */ +static void +undo_worker_attach(int slot) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + MyUndoWorker = &UndoApplyCtx->workers[slot]; + + if (!MyUndoWorker->in_use) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is empty, cannot attach", + slot))); + } + + if (MyUndoWorker->proc) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is already used by " + "another worker, cannot attach", slot))); + } + + MyUndoWorker->proc = MyProc; + before_shmem_exit(undo_worker_onexit, (Datum) 0); + + LWLockRelease(UndoWorkerLock); +} + +/* + * Walks the workers array and searches for one that matches given + * dbid. + */ +static UndoApplyWorker * +undo_worker_find(Oid dbid) +{ + int i; + UndoApplyWorker *res = NULL; + + Assert(LWLockHeldByMe(UndoWorkerLock)); + + /* Search for attached worker for a given db id. */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (w->in_use && w->dbid == dbid) + { + res = w; + break; + } + } + + return res; +} + +/* + * Check whether the dbid exist or not. + * + * Refer comments from GetDatabaseTupleByOid. + * FIXME: Should we expose GetDatabaseTupleByOid and directly use it. + */ +static bool +dbid_exist(Oid dboid) +{ + HeapTuple tuple; + Relation relation; + SysScanDesc scan; + ScanKeyData key[1]; + bool result = false; + /* + * form a scan key + */ + ScanKeyInit(&key[0], + Anum_pg_database_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(dboid)); + + relation = heap_open(DatabaseRelationId, AccessShareLock); + scan = systable_beginscan(relation, DatabaseOidIndexId, + criticalSharedRelcachesBuilt, + NULL, + 1, key); + + tuple = systable_getnext(scan); + + if (HeapTupleIsValid(tuple)) + result = true; + + /* all done */ + systable_endscan(scan); + heap_close(relation, AccessShareLock); + + return result; +} + +/* + * Start new undo apply background worker, if possible otherwise return false. + */ +static bool +undo_worker_launch(Oid dbid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + int i; + int slot = 0; + UndoApplyWorker *worker = NULL; + + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + /* Find unused worker slot. */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (!w->in_use) + { + worker = w; + slot = i; + break; + } + } + + /* There are no more free worker slots */ + if (worker == NULL) + return false; + + /* Prepare the worker slot. */ + worker->in_use = true; + worker->proc = NULL; + worker->dbid = dbid; + worker->generation++; + + generation = worker->generation; + LWLockRelease(UndoWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoWorkerMain"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "undo apply worker"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "undo apply worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(slot); + + StartTransactionCommand(); + /* Check the database exists or not. */ + if (!dbid_exist(dbid)) + { + CommitTransactionCommand(); + + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + return true; + } + + /* + * Acquire database object lock before launching the worker so that it + * doesn't get dropped while worker is connecting to the database. + */ + LockSharedObject(DatabaseRelationId, dbid, 0, RowExclusiveLock); + + /* Recheck whether database still exists or not. */ + if (!dbid_exist(dbid)) + { + CommitTransactionCommand(); + + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + return true; + } + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + + UnlockSharedObject(DatabaseRelationId, dbid, 0, RowExclusiveLock); + CommitTransactionCommand(); + + return false; + } + + /* Now wait until it attaches. */ + WaitForUndoWorkerAttach(worker, generation, bgw_handle); + + /* + * By this point the undo-worker has already connected to the database so we + * can release the database lock. + */ + UnlockSharedObject(DatabaseRelationId, dbid, 0, RowExclusiveLock); + CommitTransactionCommand(); + + return true; +} + +/* + * Detach the worker (cleans up the worker info). + */ +static void +undo_worker_detach(void) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + undo_worker_cleanup(MyUndoWorker); + + LWLockRelease(UndoWorkerLock); +} + +/* + * Clean up worker info. + */ +static void +undo_worker_cleanup(UndoApplyWorker *worker) +{ + Assert(LWLockHeldByMeInMode(UndoWorkerLock, LW_EXCLUSIVE)); + + worker->in_use = false; + worker->proc = NULL; + worker->dbid = InvalidOid; +} + +/* + * Cleanup function for undo worker launcher. + * + * Called on undo worker launcher exit. + */ +static void +undo_launcher_onexit(int code, Datum arg) +{ + UndoApplyCtx->launcher_pid = 0; +} + +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +undo_launcher_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Cleanup function. + * + * Called on logical replication worker exit. + */ +static void +undo_worker_onexit(int code, Datum arg) +{ + undo_worker_detach(); +} + +/* + * UndoLauncherShmemSize + * Compute space needed for undo launcher shared memory + */ +Size +UndoLauncherShmemSize(void) +{ + Size size; + + /* + * Need the fixed struct and the array of LogicalRepWorker. + */ + size = sizeof(UndoApplyCtxStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_undo_workers, + sizeof(UndoApplyWorker))); + return size; +} + +/* + * UndoLauncherRegister + * Register a background worker running the undo worker launcher. + */ +void +UndoLauncherRegister(void) +{ + BackgroundWorker bgw; + + if (max_undo_workers == 0) + return; + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoLauncherMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "undo worker launcher"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "undo worker launcher"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/* + * UndoLauncherShmemInit + * Allocate and initialize undo worker launcher shared memory + */ +void +UndoLauncherShmemInit(void) +{ + bool found; + + UndoApplyCtx = (UndoApplyCtxStruct *) + ShmemInitStruct("Undo Worker Launcher Data", + UndoLauncherShmemSize(), + &found); + + if (!found) + memset(UndoApplyCtx, 0, UndoLauncherShmemSize()); +} + +/* + * Main loop for the undo worker launcher process. + */ +void +UndoLauncherMain(Datum main_arg) +{ + MemoryContext tmpctx; + MemoryContext oldctx; + + ereport(DEBUG1, + (errmsg("undo launcher started"))); + + before_shmem_exit(undo_launcher_onexit, (Datum) 0); + + Assert(UndoApplyCtx->launcher_pid == 0); + UndoApplyCtx->launcher_pid = MyProcPid; + + /* Establish signal handlers. */ + pqsignal(SIGHUP, undo_launcher_sighup); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Establish connection to nailed catalogs (we only ever access + * pg_subscription). + */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* Use temporary context for the database list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Undo worker Launcher context", + ALLOCSET_DEFAULT_SIZES); + /* Enter main loop */ + for (;;) + { + int rc; + List *dblist; + ListCell *l; + + CHECK_FOR_INTERRUPTS(); + + /* switch to the temp context. */ + oldctx = MemoryContextSwitchTo(tmpctx); + dblist = RollbackHTGetDBList(); + + foreach(l, dblist) + { + UndoApplyWorker *w; + Oid dbid = lfirst_oid(l); + + LWLockAcquire(UndoWorkerLock, LW_SHARED); + w = undo_worker_find(dbid); + LWLockRelease(UndoWorkerLock); + + if (w == NULL) + { +retry: + if (!undo_worker_launch(dbid)) + { + /* Could not launch the worker, retry after sometime, */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + DEFAULT_RETRY_NAPTIME, + WAIT_EVENT_UNDO_LAUNCHER_MAIN); + goto retry; + } + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + + /* Clean the temporary memory. */ + MemoryContextReset(tmpctx); + + /* Wait for more work. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + DEFAULT_NAPTIME_PER_CYCLE, + WAIT_EVENT_UNDO_LAUNCHER_MAIN); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } +} + +/* + * UndoWorkerMain -- Main loop for the undo apply worker. + */ +void +UndoWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + Oid dbid; + + dbid = slot_get_dbid(worker_slot); + + /* Setup signal handling */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Connect to the database. */ + BackgroundWorkerInitializeConnectionByOid(dbid, 0, 0); + + /* Attach to slot */ + undo_worker_attach(worker_slot); + + /* + * Create resource owner for undo worker. Undo worker need this as it + * need to read the undo records outside the transaction blocks which + * intern access buffer read routine. + */ + CreateAuxProcessResourceOwner(); + + RollbackFromHT(dbid); + + ReleaseAuxProcessResources(true); + + proc_exit(0); +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1a7a381..b9c0e18 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -159,6 +159,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), buf.origptr); break; + case RM_UNDOACTION_ID: + /* Logical decoding is not yet implemented for undoactions. */ + Assert(0); + break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); } diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 4725cbe..1470041 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -29,6 +29,8 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" +#include "postmaster/undoloop.h" +#include "postmaster/undoworker.h" #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -152,6 +154,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, BackendRandomShmemSize()); + size = add_size(size, RollbackHTSize()); + size = add_size(size, UndoLauncherShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -226,6 +230,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SUBTRANSShmemInit(); MultiXactShmemInit(); InitBufferPool(); + InitRollbackHashTable(); /* * Set up lock manager @@ -264,6 +269,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) WalSndShmemInit(); WalRcvShmemInit(); ApplyLauncherShmemInit(); + UndoLauncherShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 554af46..52f6959 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -51,3 +51,5 @@ BackendRandomLock 43 LogicalRepWorkerLock 44 CLogTruncationLock 45 UndoLogLock 46 +RollbackHTLock 47 +UndoWorkerLock 48 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 33387fb..69c7b6a 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -286,6 +286,8 @@ InitProcGlobal(void) /* Create ProcStructLock spinlock, too */ ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t)); SpinLockInit(ProcStructLock); + + pg_atomic_init_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, 0); } /* diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c index 525decb..8b2ad64 100644 --- a/src/backend/utils/adt/lockfuncs.c +++ b/src/backend/utils/adt/lockfuncs.c @@ -32,6 +32,7 @@ const char *const LockTagTypeNames[] = { "virtualxid", "speculative token", "object", + "undoaction", "userlock", "advisory" }; diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index c693977..12e7704 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -121,6 +121,7 @@ bool allowSystemTableMods = false; int work_mem = 1024; int maintenance_work_mem = 16384; int max_parallel_maintenance_workers = 2; +int rollback_overflow_size = 64; /* * Primary determinants of sizes of shared-memory structures. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 8b0ade6..48ce9df 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2862,6 +2862,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"rollback_overflow_size", PGC_USERSET, RESOURCES_MEM, + gettext_noop("Rollbacks greater than this size are done lazily"), + NULL, + GUC_UNIT_MB + }, + &rollback_overflow_size, + 64, 0, MAX_KILOBYTES, + NULL, NULL, NULL + }, + + { {"wal_segment_size", PGC_INTERNAL, PRESET_OPTIONS, gettext_noop("Shows the size of write ahead log segments."), NULL, diff --git a/src/backend/utils/misc/pg_controldata.c b/src/backend/utils/misc/pg_controldata.c index a376875..8ebb9c2 100644 --- a/src/backend/utils/misc/pg_controldata.c +++ b/src/backend/utils/misc/pg_controldata.c @@ -78,8 +78,8 @@ pg_control_system(PG_FUNCTION_ARGS) Datum pg_control_checkpoint(PG_FUNCTION_ARGS) { - Datum values[19]; - bool nulls[19]; + Datum values[20]; + bool nulls[20]; TupleDesc tupdesc; HeapTuple htup; ControlFileData *ControlFile; @@ -128,6 +128,8 @@ pg_control_checkpoint(PG_FUNCTION_ARGS) XIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 18, "checkpoint_time", TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 19, "oldest_xid_with_epoch_having_undo", + TIMESTAMPTZOID, -1, 0); tupdesc = BlessTupleDesc(tupdesc); /* Read the control file. */ @@ -202,6 +204,9 @@ pg_control_checkpoint(PG_FUNCTION_ARGS) time_t_to_timestamptz(ControlFile->checkPointCopy.time)); nulls[17] = false; + values[18] = Int64GetDatum(ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo); + nulls[18] = false; + htup = heap_form_tuple(tupdesc, values, nulls); PG_RETURN_DATUM(HeapTupleGetDatum(htup)); diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 1fa02d2..82b9cf6 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -737,4 +737,11 @@ # CUSTOMIZED OPTIONS #------------------------------------------------------------------------------ +# If often there are large transactions requiring rollbacks, then we can push +# them to undo-workers for better performance. The size specifeid by the +# parameter below, determines the minimum size of the rollback requests to be +# sent to the undo-worker. +# +#rollback_overflow_size = 64 + # Add settings for extensions here diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 895a51f..753929e 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -278,6 +278,8 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestXidWithEpochHavingUndo:" UINT64_FORMAT "\n"), + ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index 6fb403a..72714dd 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -448,6 +448,7 @@ main(int argc, char *argv[]) if (ControlFile.checkPointCopy.oldestXid < FirstNormalTransactionId) ControlFile.checkPointCopy.oldestXid += FirstNormalTransactionId; ControlFile.checkPointCopy.oldestXidDB = InvalidOid; + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo = 0; } if (set_oldest_commit_ts_xid != 0) @@ -716,6 +717,8 @@ GuessControlValues(void) ControlFile.checkPointCopy.oldestMultiDB = InvalidOid; ControlFile.checkPointCopy.time = (pg_time_t) time(NULL); ControlFile.checkPointCopy.oldestActiveXid = InvalidTransactionId; + ControlFile.checkPointCopy.nextXid = 0; + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo = 0; ControlFile.state = DB_SHUTDOWNED; ControlFile.time = (pg_time_t) time(NULL); @@ -808,6 +811,8 @@ PrintControlValues(bool guessed) ControlFile.checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile.checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestXidWithEpochHavingUndo:" UINT64_FORMAT "\n"), + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo); printf(_("Maximum data alignment: %u\n"), ControlFile.maxAlign); /* we don't print floatFormat since can't say much useful about it */ @@ -884,6 +889,8 @@ PrintNewControlValues(void) ControlFile.checkPointCopy.oldestXid); printf(_("OldestXID's DB: %u\n"), ControlFile.checkPointCopy.oldestXidDB); + printf(_("OldestXidWithEpochHavingUndo:" UINT64_FORMAT "\n"), + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo); } if (set_xid_epoch != -1) diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 4002847..e71a71e 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -29,7 +29,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undodesc) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 938150d..3ef0a60 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -20,6 +20,7 @@ #include "access/nbtxlog.h" #include "access/rmgr.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -33,7 +34,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undodesc) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56..e1fb42a 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 9c6fca4..eee7835 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,26 +25,27 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) -PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL) diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 83ec3f1..7b983ef 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -68,6 +68,10 @@ (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \ (int32) ((id1) - (id2)) > 0) +/* Extract xid from a value comprised of epoch and xid */ +#define GetXidFromEpochXid(epochxid) \ + ((uint32) (epochxid) & 0XFFFFFFFF) + /* ---------- * Object ID (OID) zero is InvalidOid. * diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 0e932da..fb05c3b 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -41,7 +41,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); -extern void StartPrepare(GlobalTransaction gxact); +extern void StartPrepare(GlobalTransaction gxact, UndoRecPtr *, UndoRecPtr *); extern void EndPrepare(GlobalTransaction gxact); extern bool StandbyTransactionIdIsPrepared(TransactionId xid); diff --git a/src/include/access/undoaction.h b/src/include/access/undoaction.h new file mode 100644 index 0000000..5455259 --- /dev/null +++ b/src/include/access/undoaction.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * undoaction.h + * undo action prototypes + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoaction.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOACTION_H +#define UNDOACTION_H + +#include "postgres.h" + +#include "access/undolog.h" +#include "access/undorecord.h" + +/* undo record information */ +typedef struct UndoRecInfo +{ + UndoRecPtr urp; /* undo recptr (undo record location). */ + UnpackedUndoRecord *uur; /* actual undo record. */ +} UndoRecInfo; + +#endif diff --git a/src/include/access/undoaction_xlog.h b/src/include/access/undoaction_xlog.h new file mode 100644 index 0000000..bfc6418 --- /dev/null +++ b/src/include/access/undoaction_xlog.h @@ -0,0 +1,74 @@ +/*------------------------------------------------------------------------- + * + * undoaction_xlog.h + * undo action XLOG definitions + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoaction_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOACTION_XLOG_H +#define UNDOACTION_XLOG_H + +#include "access/undolog.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" +#include "storage/off.h" + +/* + * WAL record definitions for undoactions.c's WAL operations + */ +#define XLOG_UNDO_PAGE 0x00 +#define XLOG_UNDO_RESET_SLOT 0x10 +#define XLOG_UNDO_APPLY_PROGRESS 0x20 + +/* + * xl_undoaction_page flag values, 8 bits are available. + */ +#define XLU_PAGE_CONTAINS_TPD_SLOT (1<<0) +#define XLU_PAGE_CLEAR_VISIBILITY_MAP (1<<1) +#define XLU_CONTAINS_TPD_OFFSET_MAP (1<<2) +#define XLU_INIT_PAGE (1<<3) + +/* This is what we need to know about delete */ +typedef struct xl_undoaction_page +{ + UndoRecPtr urec_ptr; + TransactionId xid; + int trans_slot_id; /* transaction slot id */ +} xl_undoaction_page; + +#define SizeOfUndoActionPage (offsetof(xl_undoaction_page, trans_slot_id) + sizeof(int)) + +/* This is what we need to know about undo apply progress */ +typedef struct xl_undoapply_progress +{ + UndoRecPtr urec_ptr; + uint32 progress; +} xl_undoapply_progress; + +#define SizeOfUndoActionProgress (offsetof(xl_undoapply_progress, progress) + sizeof(uint32)) + +/* + * xl_undoaction_reset_slot flag values, 8 bits are available. + */ +#define XLU_RESET_CONTAINS_TPD_SLOT (1<<0) + +/* This is what we need to know about delete */ +typedef struct xl_undoaction_reset_slot +{ + UndoRecPtr urec_ptr; + int trans_slot_id; /* transaction slot id */ + uint8 flags; +} xl_undoaction_reset_slot; + +#define SizeOfUndoActionResetSlot (offsetof(xl_undoaction_reset_slot, flags) + sizeof(uint8)) + +extern void undoaction_redo(XLogReaderState *record); +extern void undoaction_desc(StringInfo buf, XLogReaderState *record); +extern const char *undoaction_identify(uint8 info); + +#endif /* UNDOACTION_XLOG_H */ diff --git a/src/include/access/undodiscard.h b/src/include/access/undodiscard.h new file mode 100644 index 0000000..70d6408 --- /dev/null +++ b/src/include/access/undodiscard.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * undoinsert.h + * undo discard definitions + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undodiscard.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDODISCARD_H +#define UNDODISCARD_H + +#include "access/undolog.h" +#include "access/xlogdefs.h" +#include "catalog/pg_class.h" +#include "storage/lwlock.h" + +/* + * Discard the undo for all the transaction whose xid is smaller than xmin + * + * Check the DiscardInfo memory array for each slot (every undo log) , process + * the undo log for all the slot which have xid smaller than xmin or invalid + * xid. Fetch the record from the undo log transaction by transaction until we + * find the xid which is not smaller than xmin. + */ +extern void UndoDiscard(TransactionId xmin, bool *hibernate); + +#endif /* UNDODISCARD_H */ diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h index fe4a97e..8711432 100644 --- a/src/include/access/undoinsert.h +++ b/src/include/access/undoinsert.h @@ -102,6 +102,8 @@ extern void UndoSetPrepareSize(UnpackedUndoRecord *undorecords, int nrecords, extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, uint16 prevlen); extern void UndoRecordOnUndoLogChange(UndoPersistence persistence); +extern void PrepareUpdateUndoActionProgress(UndoRecPtr urecptr, int progress); +extern void UndoRecordUpdateTransInfo(void); /* Reset globals related to undo buffers */ extern void ResetUndoBuffers(void); diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h index 9ca2455..34f248f 100644 --- a/src/include/access/undorecord.h +++ b/src/include/access/undorecord.h @@ -20,6 +20,17 @@ #include "storage/buf.h" #include "storage/off.h" +typedef enum undorectype +{ + UNDO_INSERT, + UNDO_MULTI_INSERT, + UNDO_DELETE, + UNDO_INPLACE_UPDATE, + UNDO_UPDATE, + UNDO_XID_LOCK_ONLY, + UNDO_XID_MULTI_LOCK_ONLY, + UNDO_ITEMID_UNUSED +} undorectype; /* * Every undo record begins with an UndoRecordHeader structure, which is @@ -30,6 +41,7 @@ */ typedef struct UndoRecordHeader { + RmgrId urec_rmid; /* RMGR [XXX:TODO: this creates an alignment hole?] */ uint8 urec_type; /* record type code */ uint8 urec_info; /* flag bits */ uint16 urec_prevlen; /* length of previous record in bytes */ @@ -149,6 +161,7 @@ typedef struct UndoRecordPayload */ typedef struct UnpackedUndoRecord { + RmgrId uur_rmid; /* rmgr ID */ uint8 uur_type; /* record type code */ uint8 uur_info; /* flag bits */ uint16 uur_prevlen; /* length of previous record */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 73394c5..ad8cabe 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -419,6 +419,7 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int xactflags, TransactionId twophase_xid, const char *twophase_gid); extern void xact_redo(XLogReaderState *record); +extern void XactPerformUndoActionsIfPending(void); /* xactdesc.c */ extern void xact_desc(StringInfo buf, XLogReaderState *record); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index d4e742f..1091fe7 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -225,6 +225,9 @@ extern bool XLOG_DEBUG; #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ +/* Generate a 64-bit xid by using epoch and 32-bit xid. */ +#define MakeEpochXid(epoch, xid) \ + ((epoch << 32) | (xid)) /* Checkpoint statistics */ typedef struct CheckpointStatsData diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 30610b3..9226cff 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -21,8 +21,10 @@ #include "access/xlogdefs.h" #include "access/xlogreader.h" +#include "access/undorecord.h" #include "datatype/timestamp.h" #include "lib/stringinfo.h" +#include "nodes/pg_list.h" #include "pgtime.h" #include "storage/block.h" #include "storage/relfilenode.h" @@ -294,9 +296,14 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + bool (*rm_undo) (List *luinfo, UndoRecPtr urec_ptr, Oid reloid, + TransactionId xid, BlockNumber blkno, + bool blk_chain_complete, bool rellock, + int options); + void (*rm_undo_desc) (StringInfo buf, UnpackedUndoRecord *record); } RmgrData; -extern const RmgrData RmgrTable[]; +extern PGDLLIMPORT const RmgrData RmgrTable[]; /* * Exported to support xlog switching from checkpointer diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 773d9e6..6918efc 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -61,6 +61,13 @@ typedef struct CheckPoint * set to InvalidTransactionId. */ TransactionId oldestActiveXid; + + /* + * Oldest transaction id with epoc which is having undo. Include this value + * in the checkpoint record so that whenever server starts we get proper + * value. + */ + uint64 oldestXidWithEpochHavingUndo; } CheckPoint; /* XLOG info values for XLOG rmgr */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index d6b32c0..549bb38 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -245,6 +245,7 @@ extern PGDLLIMPORT bool allowSystemTableMods; extern PGDLLIMPORT int work_mem; extern PGDLLIMPORT int maintenance_work_mem; extern PGDLLIMPORT int max_parallel_maintenance_workers; +extern PGDLLIMPORT int rollback_overflow_size; extern int VacuumCostPageHit; extern int VacuumCostPageMiss; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 763379e..7c5f05e 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -749,6 +749,7 @@ typedef enum BackendState #define PG_WAIT_TIMEOUT 0x09000000U #define PG_WAIT_IO 0x0A000000U #define PG_WAIT_PAGE_TRANS_SLOT 0x0B000000U +#define PG_WAIT_ROLLBACK_HT 0x0C000000U /* ---------- * Wait Events - Activity @@ -774,6 +775,8 @@ typedef enum WAIT_EVENT_WAL_RECEIVER_MAIN, WAIT_EVENT_WAL_SENDER_MAIN, WAIT_EVENT_WAL_WRITER_MAIN, + WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN, + WAIT_EVENT_UNDO_LAUNCHER_MAIN } WaitEventActivity; /* ---------- diff --git a/src/include/postmaster/discardworker.h b/src/include/postmaster/discardworker.h new file mode 100644 index 0000000..f00c6c4 --- /dev/null +++ b/src/include/postmaster/discardworker.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * discardworker.h + * Exports from postmaster/discardworker.c. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/postmaster/discardworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef _DISCARDWORKER_H +#define _DISCARDWORKER_H + +/* + * This function will perform multiple actions based on need. (a) retrieve + * transactions which have become all-visible and truncate the associated undo + * logs or will increment the tail pointer. (b) drop the buffers corresponding + * to truncated pages. + */ +extern void DiscardWorkerMain(Datum main_arg) pg_attribute_noreturn(); +extern void DiscardWorkerRegister(void); + +#endif /* _DISCARDWORKER_H */ diff --git a/src/include/postmaster/undoloop.h b/src/include/postmaster/undoloop.h new file mode 100644 index 0000000..21d91c3 --- /dev/null +++ b/src/include/postmaster/undoloop.h @@ -0,0 +1,89 @@ +/*------------------------------------------------------------------------- + * + * undoloop.h + * Exports from postmaster/undoloop.c. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * + * src/include/postmaster/undoloop.h + * + *------------------------------------------------------------------------- + */ +#ifndef _UNDOLOOP_H +#define _UNDOLOOP_H + +#include "access/undoinsert.h" +#include "utils/hsearch.h" +#include "utils/relcache.h" + + +/* Various options while executing the undo actions for the page. */ +#define UNDO_ACTION_UPDATE_TPD 0x0001 + +/* Remembers the last seen RecentGlobalXmin */ +TransactionId latestRecentGlobalXmin; + +/* + * This function will read the undo records starting from the undo + * from_urecptr till to_urecptr and if to_urecptr is invalid then till the + * first undo location of transaction. This also discards the buffers by + * calling DropUndoBuffers for which undo log is removed. This function + * can be used by RollbackToSavePoint, by Rollback, by undoworker to complete + * the work of errored out transactions or when there is an error in single + * user mode. + */ +extern void execute_undo_actions(UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, bool nopartial, bool rewind, bool rellock); +extern void process_and_execute_undo_actions_page(UndoRecPtr from_urecptr, + Relation rel, Buffer buffer, uint32 epoch, + TransactionId xid, int slot_no); + +/* + * This function will be responsible to truncate the undo logs + * for transactions that become all-visible after RecentGlobalXmin has + * advanced (value is different than latestRecentGlobalXmin). The easiest + * way could be to traverse the undo log array that contains least transaction + * id for that undo log and see if it precedes RecentGlobalXmin, then start + * discarding the undo log for that transaction (moving the tail pointer of + * undo log) till it finds the transaction which is not all-visible. This also + * discards the buffers by calling ForgetBuffer for which undo log is + * removed. This function can be invoked by undoworker or after commit in + * single user mode. + */ +extern void recover_undo_pages(); + +/* + * To increase the efficiency of the zheap system, we create a hash table for + * the rollbacks. All the rollback requests exceeding certain threshold, are + * pushed to this table. Undo worker starts reading the entries from this hash + * table one at a time, performs undo actions related to the respective xid and + * removes them from the hash table. This way backend is free from performing the + * undo actions in case of heavy rollbacks. The data structures and the routines + * required for this infrastructure are as follows. + */ + +/* This is the data structure for each hash table entry for rollbacks. */ +typedef struct RollbackHashEntry +{ + UndoRecPtr start_urec_ptr; + UndoRecPtr end_urec_ptr; + Oid dbid; +} RollbackHashEntry; + +extern bool RollbackHTIsFull(void); + +/* To push the rollback requests from backend to the respective hash table */ +extern bool PushRollbackReq(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr, + Oid dbid); + +/* To perform the undo actions reading from the hash table */ +extern void RollbackFromHT(Oid dbid); +/* To calculate the size of the hash table size for rollabcks. */ +extern int RollbackHTSize(void); + +/* To initialize the hash table in shared memory for rollbacks. */ +extern void InitRollbackHashTable(void); +extern List *RollbackHTGetDBList(void); +extern bool ConditionTransactionUndoActionLock(TransactionId xid); +extern void TransactionUndoActionLockRelease(TransactionId xid); +#endif /* _UNDOLOOP_H */ diff --git a/src/include/postmaster/undoworker.h b/src/include/postmaster/undoworker.h new file mode 100644 index 0000000..c277e01 --- /dev/null +++ b/src/include/postmaster/undoworker.h @@ -0,0 +1,39 @@ +/*------------------------------------------------------------------------- + * + * undoworker.h + * Exports from postmaster/undoworker.c. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/postmaster/undoworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef _UNDOWORKER_H +#define _UNDOWORKER_H + +/* GUC options */ +/* undo worker sleep time between rounds */ +extern int UndoWorkerDelay; + +/* + * This function will perform multiple actions based on need. (a) retreive + * transaction and its corresponding undopoiter from shared memory queue and + * call undoloop to perform undo actions. After applying all the undo records + * for a particular transaction, it will increment the tail pointer in undo log. + * (b) it needs to retrieve transactions which have become all-visible and truncate + * the associated undo logs or will increment the tail pointer. (c) udjust the + * number of undo workers based on the work required to perform undo actions + * (it could be size of shared memory queue containing transactions that needs + * aborts). (d) drop the buffers corresponding to truncated pages (e) Sleep for + * UndoWorkerDelay, if there is no more work. + */ +extern void UndoWorkerMain(Datum main_arg) pg_attribute_noreturn(); +extern void UndoLauncherRegister(void); +extern void UndoLauncherShmemInit(void); +extern Size UndoLauncherShmemSize(void); +extern void UndoLauncherMain(Datum main_arg); +extern void UndoWorkerMain(Datum main_arg); + +#endif /* _UNDOWORKER_H */ diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index a37fda7..675761c 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -150,6 +150,8 @@ typedef enum LockTagType LOCKTAG_VIRTUALTRANSACTION, /* virtual transaction (ditto) */ /* ID info for a virtual transaction is its VirtualTransactionId */ LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */ + /* ID info for an transaction undoaction is transaction id */ + LOCKTAG_TRANSACTION_UNDOACTION, /* transaction (waiting for undoaction) */ /* ID info for a transaction is its TransactionId */ LOCKTAG_OBJECT, /* non-relation database object */ /* ID info for an object is DB OID + CLASS OID + OBJECT OID + SUBID */ @@ -246,6 +248,14 @@ typedef struct LOCKTAG (locktag).locktag_type = LOCKTAG_SPECULATIVE_TOKEN, \ (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD) +#define SET_LOCKTAG_TRANSACTION_UNDOACTION(locktag,xid) \ + ((locktag).locktag_field1 = (xid), \ + (locktag).locktag_field2 = 0, \ + (locktag).locktag_field3 = 0, \ + (locktag).locktag_field4 = 0, \ + (locktag).locktag_type = LOCKTAG_TRANSACTION_UNDOACTION, \ + (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD) + #define SET_LOCKTAG_OBJECT(locktag,dboid,classoid,objoid,objsubid) \ ((locktag).locktag_field1 = (dboid), \ (locktag).locktag_field2 = (classoid), \ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index cb613c8..f6b9d98 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -270,6 +270,8 @@ typedef struct PROC_HDR int startupProcPid; /* Buffer id of the buffer that Startup process waits for pin on, or -1 */ int startupBufferPinWaitBufId; + /* Oldest transaction id which is having undo. */ + pg_atomic_uint64 oldestXidWithEpochHavingUndo; } PROC_HDR; extern PGDLLIMPORT PROC_HDR *ProcGlobal; -- 1.8.3.1