From de633a9d22fc21f1b7fbe95c2dd6ed28dbbee879 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 13 Jun 2019 15:27:58 +0530 Subject: [PATCH 11/22] Infrastructure to execute pending undo actions. To apply the undo actions, we collect the undo records in bulk and try to process them together. We ensure to update the transaction's progress at regular intervals so that after a crash we can skip already applied undo. This provides a way for users to register a callback for processing the undo records based on resource manager. Dilip Kumar, Amit Kapila, Thomas Munro and Kuntal Ghosh with inputs from Robert Haas --- src/backend/access/rmgrdesc/Makefile | 3 +- src/backend/access/rmgrdesc/undoactiondesc.c | 47 ++ src/backend/access/transam/rmgr.c | 5 +- src/backend/access/undo/Makefile | 3 +- src/backend/access/undo/undoaccess.c | 42 +- src/backend/access/undo/undoaction.c | 516 +++++++++++++++++++ src/backend/access/undo/undoactionxlog.c | 60 +++ src/backend/replication/logical/decode.c | 1 + src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/rmgrdesc.c | 3 +- src/include/access/rmgr.h | 2 +- src/include/access/rmgrlist.h | 52 +- src/include/access/undoaccess.h | 4 + src/include/access/undoaction_xlog.h | 39 ++ src/include/access/undorecord.h | 2 + src/include/access/undorequest.h | 3 - src/include/access/xlog_internal.h | 18 +- 17 files changed, 766 insertions(+), 36 deletions(-) create mode 100644 src/backend/access/rmgrdesc/undoactiondesc.c create mode 100644 src/backend/access/undo/undoaction.c create mode 100644 src/backend/access/undo/undoactionxlog.c create mode 100644 src/include/access/undoaction_xlog.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 91ad1ef8a3d..640d37f37a3 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undologdesc.o xactdesc.o xlogdesc.o + smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \ + undologdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/undoactiondesc.c b/src/backend/access/rmgrdesc/undoactiondesc.c new file mode 100644 index 00000000000..c396582b599 --- /dev/null +++ b/src/backend/access/rmgrdesc/undoactiondesc.c @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * undoactiondesc.c + * rmgr descriptor routines for access/undo/undoactionxlog.c + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/undoactiondesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" + +void +undoaction_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_UNDO_APPLY_PROGRESS) + { + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) rec; + + appendStringInfo(buf, "urec_ptr %lu progress %u", + xlrec->urec_ptr, xlrec->progress); + } +} + +const char * +undoaction_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_UNDO_APPLY_PROGRESS: + id = "UNDO_APPLY_PROGRESS"; + break; + } + + return id; +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 8b0537405a9..c57eca240f5 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -18,6 +18,7 @@ #include "access/multixact.h" #include "access/nbtxlog.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -31,8 +32,8 @@ #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \ + { name, redo, desc, identify, startup, cleanup, mask, undo, undo_status, undo_desc }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile index 73275028be9..68696bc81a8 100644 --- a/src/backend/access/undo/Makefile +++ b/src/backend/access/undo/Makefile @@ -12,6 +12,7 @@ subdir = src/backend/access/undo top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = undoaccess.o undolog.o undorecord.o undorequest.o +OBJS = undoaccess.o undoaction.o undoactionxlog.o undolog.o undorecord.o \ + undorequest.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/undo/undoaccess.c b/src/backend/access/undo/undoaccess.c index 92496fbbc8a..36080159619 100644 --- a/src/backend/access/undo/undoaccess.c +++ b/src/backend/access/undo/undoaccess.c @@ -271,6 +271,46 @@ PrepareUndoRecordUpdateNext(UndoRecordInsertContext *context, LWLockRelease(&slot->discard_update_lock); } +/* + * Prepare to update the undo apply progress in the group header. + */ +void +PrepareUndoRecordApplyProgress(UndoRecordInsertContext *context, + UndoRecPtr urecptr, BlockNumber progress) +{ + int index = 0; + int offset; + + Assert(UndoRecPtrIsValid(urecptr)); + + /* + * Temporary undo logs are discarded on transaction commit so we don't need + * to do anything. + */ + if (UndoRecPtrGetCategory(urecptr) == UNDO_TEMP) + return; + + /* + * Here, we are preparing to update the undo apply progress of a + * transaction being rolled back. The undo must not be discarded + * till the transaction is completely rolled back. + */ + Assert(!UndoRecPtrIsDiscarded(urecptr)); + + /* Compute the offset of the urec_progress in the undo record. */ + offset = SizeOfUndoRecordHeader + + offsetof(UndoRecordGroup, urec_progress); + + index = PrepareUndoRecordUpdate(context, urecptr, sizeof(UndoRecPtr), + offset); + + /* + * Set the undo action progress in xact_urec_info, this will be overwritten + * in actual undo record during update phase. + */ + context->urec_update_info[index].progress = progress; +} + /* * Update the undo record * @@ -278,7 +318,7 @@ PrepareUndoRecordUpdateNext(UndoRecordInsertContext *context, * Exact offset to be updated is already computed and necessary buffers are * locked during the prepare phase. */ -static void +void UndoRecordUpdate(UndoRecordInsertContext *context, int idx) { Page page = NULL; diff --git a/src/backend/access/undo/undoaction.c b/src/backend/access/undo/undoaction.c new file mode 100644 index 00000000000..4258d15ebc2 --- /dev/null +++ b/src/backend/access/undo/undoaction.c @@ -0,0 +1,516 @@ +/*------------------------------------------------------------------------- + * + * undoaction.c + * execute undo actions + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undoaction.c + * + * To apply the undo actions, we collect the undo records in bulk and try to + * process them together. We ensure to update the transaction's progress at + * regular intervals so that after a crash we can skip already applied undo. + * The undo apply progress is updated in terms of the number of blocks + * processed. Undo apply progress value XACT_APPLY_PROGRESS_COMPLETED + * indicates that all the undo is applied, XACT_APPLY_PROGRESS_NOT_STARTED + * indicates that no undo action has been applied yet and any other value + * indicates that we have applied undo partially and after crash recovery, we + * need to start processing the undo from the same location. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "access/undoaction_xlog.h" +#include "access/undolog.h" +#include "access/undorequest.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "access/xlog_internal.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "utils/relfilenodemap.h" +#include "utils/syscache.h" +#include "miscadmin.h" +#include "storage/shmem.h" + +static void UpdateUndoApplyProgress(UndoRecPtr last_log_start_urec_ptr, + BlockNumber block_num); +static bool UndoAlreadyApplied(FullTransactionId full_xid, + UndoRecPtr to_urecptr); +static void ApplyUndo(UndoRecInfo *urecinfo, int nrecords); +static void ProcessAndApplyUndo(FullTransactionId full_xid, + UndoRecPtr from_urecptr, UndoRecPtr to_urecptr, + UndoRecPtr last_log_start_urec_ptr, bool complete_xact); + +/* + * undo_record_comparator - qsort comparator for undo records. + * + * This is used to sort undo records for applying undo actions of the + * transaction. + */ +static int +undo_record_comparator(const void *left, const void *right) +{ + UnpackedUndoRecord *luur = ((UndoRecInfo *) left)->uur; + UnpackedUndoRecord *ruur = ((UndoRecInfo *) right)->uur; + + if (luur->uur_rmid < ruur->uur_rmid) + return -1; + else if (luur->uur_rmid > ruur->uur_rmid) + return 1; + else if (luur->uur_reloid < ruur->uur_reloid) + return -1; + else if (luur->uur_reloid > ruur->uur_reloid) + return 1; + else if (luur->uur_block < ruur->uur_block) + return -1; + else if (luur->uur_block > ruur->uur_block) + return 1; + else if (luur->uur_offset < ruur->uur_offset) + return -1; + else if (luur->uur_offset > ruur->uur_offset) + return 1; + else if (((UndoRecInfo *) left)->index < ((UndoRecInfo *) right)->index) + { + /* + * If records are for the same block and offset, then maintain their + * existing order by comparing their index in the array. + */ + return -1; + } + else + return 1; +} + +/* + * UpdateUndoApplyProgress - Updates how far undo actions from a particular + * log have been applied while rolling back a transaction. This progress is + * measured in terms of undo block number of the undo log till which the + * undo actions have been applied. + */ +static void +UpdateUndoApplyProgress(UndoRecPtr progress_urec_ptr, + BlockNumber block_num) +{ + UndoLogCategory category; + UndoRecordInsertContext context = {{0}}; + + category = + UndoLogNumberGetCategory(UndoRecPtrGetLogNo(progress_urec_ptr)); + + /* + * We don't need to update the progress for temp tables as they get + * discraded after startup. + */ + if (category == UNDO_TEMP) + return; + + BeginUndoRecordInsert(&context, category, 1, NULL); + + /* + * Prepare and update the undo apply progress in the transaction header. + */ + PrepareUndoRecordApplyProgress(&context, progress_urec_ptr, block_num); + + START_CRIT_SECTION(); + + /* Update the progress in the transaction header. */ + UndoRecordUpdate(&context, 0); + + /* WAL log the undo apply progress. */ + { + XLogRecPtr lsn; + xl_undoapply_progress xlrec; + + xlrec.urec_ptr = progress_urec_ptr; + xlrec.progress = block_num; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + + RegisterUndoLogBuffers(&context, 1); + lsn = XLogInsert(RM_UNDOACTION_ID, XLOG_UNDO_APPLY_PROGRESS); + UndoLogBuffersSetLSN(&context, lsn); + } + + END_CRIT_SECTION(); + + /* Release undo buffers. */ + FinishUndoRecordInsert(&context); +} + +/* + * UndoAlreadyApplied - Retruns true, if the actions are already applied, + * false, otherwise. + */ +static bool +UndoAlreadyApplied(FullTransactionId full_xid, UndoRecPtr to_urecptr) +{ + UnpackedUndoRecord *uur = NULL; + UndoRecordFetchContext context; + + /* Fetch the undo record. */ + BeginUndoFetch(&context); + uur = UndoFetchRecord(&context, to_urecptr); + FinishUndoFetch(&context); + + /* already processed and discarded */ + if (uur == NULL) + { + /* + * Undo action is already applied, so delete the hash table entry + * if exists. + */ + RollbackHTRemoveEntry(full_xid, to_urecptr, false); + return true; + } + + /* already processed */ + if (IsXactApplyProgressCompleted(uur->uur_group->urec_progress)) + { + /* + * Undo action is already applied, so delete the hash table entry + * if exists. + */ + RollbackHTRemoveEntry(full_xid, to_urecptr, false); + UndoRecordRelease(uur); + return true; + } + + Assert(FullTransactionIdEquals(full_xid, uur->uur_fxid)); + + UndoRecordRelease(uur); + + return false; +} + +/* + * ApplyUndo - Invode rmgr specific undo apply functions. + * + * urecinfo - An array of undo records sorted in the rmgr order. + * nrecords - number of records in this array. + */ +static void +ApplyUndo(UndoRecInfo *urecinfo, int nrecords) +{ + int rmgr_start_idx = 0; + int rmgr_nrecords = 0; + int prev_rmid = -1; + int i; + + /* Apply the undo action for each rmgr. */ + for (i = 0; i < nrecords; i++) + { + UnpackedUndoRecord *uur = urecinfo[i].uur; + + Assert(uur->uur_rmid >= 0); + + /* + * If this undo is not for the same rmgr then apply all undo + * actions for the previous rmgr. + */ + if (prev_rmid >= 0 && + prev_rmid != uur->uur_rmid) + { + Assert(urecinfo[rmgr_start_idx].uur->uur_rmid == prev_rmid); + RmgrTable[prev_rmid].rm_undo(rmgr_nrecords, + &urecinfo[rmgr_start_idx]); + + rmgr_start_idx = i; + rmgr_nrecords = 0; + } + + rmgr_nrecords++; + prev_rmid = uur->uur_rmid; + } + + /* Apply the last set of the actions. */ + Assert(urecinfo[rmgr_start_idx].uur->uur_rmid == prev_rmid); + RmgrTable[prev_rmid].rm_undo(rmgr_nrecords, &urecinfo[rmgr_start_idx]); +} + +/* + * ProcessAndApplyUndo - Fetch undo records and apply actions. + * + * We always process the undo of the last log when the undo for a transaction + * spans across multiple logs. Then from there onwards the previous undo logs + * for the same transaction are processed. + * + * We also update the undo apply progress in the transaction header so that + * after recovery we don't need to process the records that are already + * processed. As we update the progress only after one batch of records, + * the crash in-between can cause us to read/apply part of undo records + * again but this will never be more than one-batch. We can further optimize + * it by marking the progress in each record, but that has its own downsides + * like it will generate more WAL and I/O corresponding to dirty undo buffers. + */ +static void +ProcessAndApplyUndo(FullTransactionId full_xid, UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, UndoRecPtr last_log_start_urec_ptr, + bool complete_xact) +{ + UndoRecInfo *urecinfo; + UndoRecPtr urec_ptr = from_urecptr; + int undo_apply_size; + + /* + * We choose maintenance_work_mem to collect the undo records for + * rollbacks as most of the large rollback requests are done by + * background worker which can be considered as maintainence operation. + * However, we can introduce a new guc for this as well. + */ + undo_apply_size = maintenance_work_mem * 1024L; + + /* + * Fetch the multiple undo records that can fit into undo_apply_size; sort + * them and then rmgr specific callback to process them. Repeat this + * until we process all the records for the transaction being rolled back. + */ + while (true) + { + BlockNumber progress_block_num = InvalidBlockNumber; + int i; + int nrecords; + bool log_switched = false; + bool rollback_completed = false; + bool update_progress = false; + UndoRecPtr progress_urec_ptr = InvalidUndoRecPtr; + UndoRecInfo *first_urecinfo; + UndoRecInfo *last_urecinfo; + + CHECK_FOR_INTERRUPTS(); + + /* + * Fetch multiple undo records at once. + * + * At a time, we only fetch the undo records from a single undo log. + * Once, we process all the undo records from one undo log, we update + * the last_log_start_urec_ptr and proceed to the previous undo log. + */ + urecinfo = UndoBulkFetchRecord(&urec_ptr, last_log_start_urec_ptr, + undo_apply_size, &nrecords); + + /* + * Since the rollback of this transaction is in-progress, there will be + * at least one undo record which is not yet discarded. + */ + Assert(nrecords > 0); + + /* + * Get the required information from first and last undo record before + * we sort all the records. + */ + first_urecinfo = &urecinfo[0]; + last_urecinfo = &urecinfo[nrecords - 1]; + if (IsUndoLogSwitched(last_urecinfo->uur)) + { + UndoRecordLogSwitch *logswitch = last_urecinfo->uur->uur_logswitch; + + /* + * We have crossed the log boundary. The rest of the undo for + * this transaction is in some other log, the location of which + * can be found from this record. See commets atop undoaccess.c. + */ + log_switched = true; + + /* + * The last fetched undo record corresponds to the first undo + * record of the current log. Once, the undo actions are performed + * from this log, we've to mark the progress as completed. + */ + progress_urec_ptr = last_urecinfo->urp; + + /* + * We also need to save the start location of this transaction in + * previous log. This will be used in the next iteration of bulk + * fetch and updating progress location. + */ + if (complete_xact) + { + Assert(UndoRecPtrIsValid(logswitch->urec_prevlogstart)); + last_log_start_urec_ptr = logswitch->urec_prevlogstart; + } + + /* We've to update the progress for the current log as completed. */ + update_progress = true; + } + else if (complete_xact) + { + if (UndoRecPtrIsValid(urec_ptr)) + { + /* + * There are still some undo actions pending in this log. So, + * just update the progress block number. + */ + progress_block_num = UndoRecPtrGetBlockNum(last_urecinfo->urp); + + /* + * If we've not fetched undo records for more than one undo + * block, we can't update the progress block number. Because, + * there can still be undo records in this block that needs to + * be applied for rolling back this transaction. + */ + if (UndoRecPtrGetBlockNum(first_urecinfo->urp) > progress_block_num) + { + update_progress = true; + progress_urec_ptr = last_log_start_urec_ptr; + } + } + else + { + /* + * Invalid urec_ptr indicates that we have executed all the undo + * actions for this transaction. So, mark current log header + * as complete. + */ + Assert(last_log_start_urec_ptr == to_urecptr); + rollback_completed = true; + update_progress = true; + progress_urec_ptr = last_log_start_urec_ptr; + } + } + + /* + * The undo records must belong to the transaction that is being + * rolled back. + */ + Assert(FullTransactionIdEquals(full_xid, urecinfo[0].uur->uur_fxid)); + + /* Sort the undo record array in order of target blocks. */ + qsort((void *) urecinfo, nrecords, sizeof(UndoRecInfo), + undo_record_comparator); + + /* Call resource manager specific callbacks to apply actions. */ + ApplyUndo(urecinfo, nrecords); + + /* Set undo action apply progress if required. */ + if (update_progress) + { + Assert(UndoRecPtrIsValid(progress_urec_ptr)); + + if (log_switched || rollback_completed) + { + /* + * We have crossed the log boundary or executed all the undo + * actions for the main transaction. So, mark current log + * header as complete and set the next progress location in + * the previous log. + */ + UpdateUndoApplyProgress(progress_urec_ptr, + XACT_APPLY_PROGRESS_COMPLETED); + } + else + { + /* + * Update the progress block number. We increase the block + * number by one since the current block might have some undo + * records that are yet to be applied. But, all undo records + * from the next block must have been applied. + */ + UpdateUndoApplyProgress(progress_urec_ptr, + progress_block_num + 1); + } + } + + /* Free all undo records. */ + for (i = 0; i < nrecords; i++) + UndoRecordRelease(urecinfo[i].uur); + + /* Free urp array for the current batch of undo records. */ + pfree(urecinfo); + + /* + * Invalid urec_ptr indicates that we have executed all the undo + * actions for this transaction. + */ + if (!UndoRecPtrIsValid(urec_ptr)) + break; + } +} + +/* + * execute_undo_actions - Execute the undo actions + * + * full_xid - Transaction id that is getting rolled back. + * from_urecptr - undo record pointer from where to start applying undo + * actions. + * to_urecptr - undo record pointer up to which the undo actions need to be + * applied. + * complete_xact - true if rollback is for complete transaction. + */ +void +execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, bool complete_xact) +{ + UndoRecPtr last_log_start_urec_ptr = to_urecptr; + + /* 'from' and 'to' pointers must be valid. */ + Assert(UndoRecPtrIsValid(from_urecptr)); + Assert(UndoRecPtrIsValid(to_urecptr)); + + /* + * Here we compute the last log start urp which is used for fetching the + * undo records and updating the undo action progress. + * + * For rollbacks of subtransaction, we won't be able to calculate the last + * log start urp since we don't have the start urp of the top xid and hence + * we won't be able to follow the transaction chains to find the last log. + */ + if (complete_xact) + { + if (UndoRecPtrGetCategory(to_urecptr) == UNDO_TEMP) + { + UndoRecPtr end_urec_ptr = from_urecptr; + + /* + * For temporary tables, we don't push the rollback request in the + * rollback hash table so we can't directly get the last log start + * urp from there. Instead, we need to compute it now. + */ + (void) FindUndoEndLocationAndSize(to_urecptr, &end_urec_ptr, + &last_log_start_urec_ptr, + full_xid); + } + else + { + /* + * It is important here to fetch the latest undo record and validate if + * the actions are already executed. The reason is that it is possible + * that discard worker or backend might try to execute the rollback + * request which is already executed. For ex., after discard worker + * fetches the record and found that this transaction need to be + * rolledback, backend might concurrently execute the actions and + * remove the request from rollback hash table. + * + * The other case where this will be required is when the transactions + * records span across multiple logs. Say, we register the + * transaction from the first log and then we encounter the same + * transaction in the second log where its status is still not marked + * as done. Now, before we try to register the request for the second + * log, the undo worker came along rolled back the previous request + * and removed its hash entry. In this case, we will successfully + * register the request from the second log and it should be detected + * here. + */ + if (UndoAlreadyApplied(full_xid, to_urecptr)) + return; + + last_log_start_urec_ptr = + RollbackHTGetLastLogStartUrp(full_xid, to_urecptr); + } + } + + ProcessAndApplyUndo(full_xid, from_urecptr, to_urecptr, + last_log_start_urec_ptr, complete_xact); + + /* + * Undo actions are applied so delete the hash table entry. + */ + RollbackHTRemoveEntry(full_xid, to_urecptr, false); +} diff --git a/src/backend/access/undo/undoactionxlog.c b/src/backend/access/undo/undoactionxlog.c new file mode 100644 index 00000000000..61e8815b6be --- /dev/null +++ b/src/backend/access/undo/undoactionxlog.c @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * + * undoactionxlog.c + * WAL replay logic for undo actions. + * + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/undo/undoactionxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" +#include "access/undoaccess.h" +#include "access/xlog.h" +#include "access/xlogutils.h" + +/* + * Replay of undo apply progress. + */ +static void +undo_xlog_apply_progress(XLogReaderState *record) +{ + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) XLogRecGetData(record); + UndoLogCategory category; + UndoRecordInsertContext context = {{0}}; + + category = + UndoLogNumberGetCategory(UndoRecPtrGetLogNo(xlrec->urec_ptr)); + + BeginUndoRecordInsert(&context, category, 1, record); + + /* Update the undo apply progress in the transaction header. */ + PrepareUndoRecordApplyProgress(&context, xlrec->urec_ptr, + xlrec->progress); + + UndoRecordUpdate(&context, 0); + + /* Release undo buffers. */ + FinishUndoRecordInsert(&context); +} + +void +undoaction_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_UNDO_APPLY_PROGRESS: + undo_xlog_apply_progress(record); + break; + default: + elog(PANIC, "undoaction_redo: unknown op code %u", info); + } +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index a4ecad8488a..7293cc7abdc 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -155,6 +155,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_REPLORIGIN_ID: case RM_GENERIC_ID: case RM_UNDOLOG_ID: + case RM_UNDOACTION_ID: /* just deal with xid, and done */ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), buf.origptr); diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 63c3879ead8..d64472daea0 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 938150dd915..976f80e9c30 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -20,6 +20,7 @@ #include "access/nbtxlog.h" #include "access/rmgr.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -33,7 +34,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56a4c6..0a3794a44e5 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 6945e3e9504..b424c3ca5cc 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -24,27 +24,31 @@ * Changes to this list possibly need an XLOG_PAGE_MAGIC bump. */ -/* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) -PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL) +/* + * symbol name, textual name, redo, desc, identify, startup, cleanup, mask, + * undo, undo_status, undo_desc + */ +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, NULL, NULL, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, NULL, NULL, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask, NULL, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL, NULL) diff --git a/src/include/access/undoaccess.h b/src/include/access/undoaccess.h index 1a9640c4e76..35593546801 100644 --- a/src/include/access/undoaccess.h +++ b/src/include/access/undoaccess.h @@ -13,6 +13,7 @@ #ifndef UNDOACCESS_H #define UNDOACCESS_H +#include "access/transam.h" #include "access/undolog.h" #include "access/undorecord.h" #include "access/xlogdefs.h" @@ -92,6 +93,9 @@ typedef struct UndoRecordFetchContext UndoRecPtr urp; /* Previous undo record pointer. */ } UndoRecordFetchContext; +extern void PrepareUndoRecordApplyProgress(UndoRecordInsertContext *context, + UndoRecPtr urecptr, BlockNumber progress); +extern void UndoRecordUpdate(UndoRecordInsertContext *context, int idx); extern void BeginUndoRecordInsert(UndoRecordInsertContext *context, UndoLogCategory category, int nprepared, diff --git a/src/include/access/undoaction_xlog.h b/src/include/access/undoaction_xlog.h new file mode 100644 index 00000000000..b9e65d1f7a8 --- /dev/null +++ b/src/include/access/undoaction_xlog.h @@ -0,0 +1,39 @@ +/*------------------------------------------------------------------------- + * + * undoaction_xlog.h + * undo action XLOG definitions + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoaction_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOACTION_XLOG_H +#define UNDOACTION_XLOG_H + +#include "access/undolog.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" +#include "storage/off.h" + +/* + * WAL record definitions for undoactions.c's WAL operations + */ +#define XLOG_UNDO_APPLY_PROGRESS 0x00 + +/* This is what we need to know about undo apply progress */ +typedef struct xl_undoapply_progress +{ + UndoRecPtr urec_ptr; + uint32 progress; +} xl_undoapply_progress; + +#define SizeOfUndoActionProgress (offsetof(xl_undoapply_progress, progress) + sizeof(uint32)) + +extern void undoaction_redo(XLogReaderState *record); +extern void undoaction_desc(StringInfo buf, XLogReaderState *record); +extern const char *undoaction_identify(uint8 info); + +#endif /* UNDOACTION_XLOG_H */ diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h index 80beaef0a74..30b7db46ab7 100644 --- a/src/include/access/undorecord.h +++ b/src/include/access/undorecord.h @@ -254,6 +254,8 @@ typedef struct UnpackedUndoRecord * during a transaction. */ } UnpackedUndoRecord; +#define IsUndoLogSwitched(uur) (uur->uur_logswitch != NULL) + extern size_t UndoRecordExpectedSize(UnpackedUndoRecord *uur); extern size_t UndoRecordPayloadSize(UnpackedUndoRecord *uur); extern void BeginInsertUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur, diff --git a/src/include/access/undorequest.h b/src/include/access/undorequest.h index 64d89ba9720..82640b88f44 100644 --- a/src/include/access/undorequest.h +++ b/src/include/access/undorequest.h @@ -224,8 +224,5 @@ extern FullTransactionId RollbackHTGetOldestFullXid(FullTransactionId oldestXmin /* functions exposed from undoaction.c */ extern void execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr, UndoRecPtr to_urecptr, bool nopartial); -extern bool execute_undo_actions_page(UndoRecInfo *urp_array, int first_idx, - int last_idx, Oid reloid, FullTransactionId full_xid, - BlockNumber blkno, bool blk_chain_complete); #endif /* _UNDOREQUEST_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 3f0de6625d7..1ff312f7473 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -19,10 +19,14 @@ #ifndef XLOG_INTERNAL_H #define XLOG_INTERNAL_H +#include "access/transam.h" +#include "access/undoaccess.h" +#include "access/undorecord.h" #include "access/xlogdefs.h" #include "access/xlogreader.h" #include "datatype/timestamp.h" #include "lib/stringinfo.h" +#include "nodes/pg_list.h" #include "pgtime.h" #include "storage/block.h" #include "storage/relfilenode.h" @@ -270,6 +274,15 @@ typedef enum RECOVERY_TARGET_ACTION_SHUTDOWN } RecoveryTargetAction; +/* + * Return values for undo status callback functions. + */ +typedef enum UndoStatus +{ + UNDO_STATUS_WAIT_XMIN, /* wait until the xmin passes an xid */ + UNDO_STATUS_DISCARD /* the record set should be discarded */ +} UndoStatus; + /* * Method table for resource managers. * @@ -295,9 +308,12 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + void (*rm_undo) (int nrecords, UndoRecInfo *records); + UndoStatus (*rm_undo_status) (UnpackedUndoRecord *record, TransactionId *xid); + void (*rm_undo_desc) (StringInfo buf, UnpackedUndoRecord *record); } RmgrData; -extern const RmgrData RmgrTable[]; +extern PGDLLIMPORT const RmgrData RmgrTable[]; /* * Exported to support xlog switching from checkpointer -- 2.22.0