From a9aba2a6ed57b592385c8c35fda5909d580f35ae Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 13 Jun 2019 15:27:58 +0530 Subject: [PATCH 12/14] Infrastructure to execute pending undo actions. To apply the undo actions, we collect the undo records in bulk and try to process them together. We ensure to update the transaction's progress at regular intervals so that after a crash we can skip already applied undo. This provides a way for users to register a callback for processing the undo records based on resource manager. Dilip Kumar, Amit Kapila, Thomas Munro and Kuntal Ghosh with inputs from Robert Haas --- src/backend/access/rmgrdesc/Makefile | 3 +- src/backend/access/rmgrdesc/undoactiondesc.c | 47 +++ src/backend/access/transam/rmgr.c | 5 +- src/backend/access/undo/Makefile | 3 +- src/backend/access/undo/undoaccess.c | 39 ++- src/backend/access/undo/undoaction.c | 450 +++++++++++++++++++++++++++ src/backend/access/undo/undoactionxlog.c | 60 ++++ src/backend/replication/logical/decode.c | 1 + src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/rmgrdesc.c | 3 +- src/include/access/rmgr.h | 2 +- src/include/access/rmgrlist.h | 47 +-- src/include/access/undoaccess.h | 4 + src/include/access/undoaction_xlog.h | 39 +++ src/include/access/undorequest.h | 3 - src/include/access/xlog_internal.h | 8 +- 16 files changed, 679 insertions(+), 37 deletions(-) create mode 100644 src/backend/access/rmgrdesc/undoactiondesc.c create mode 100644 src/backend/access/undo/undoaction.c create mode 100644 src/backend/access/undo/undoactionxlog.c create mode 100644 src/include/access/undoaction_xlog.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 91ad1ef..640d37f 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undologdesc.o xactdesc.o xlogdesc.o + smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \ + undologdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/undoactiondesc.c b/src/backend/access/rmgrdesc/undoactiondesc.c new file mode 100644 index 0000000..c396582 --- /dev/null +++ b/src/backend/access/rmgrdesc/undoactiondesc.c @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * undoactiondesc.c + * rmgr descriptor routines for access/undo/undoactionxlog.c + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/undoactiondesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" + +void +undoaction_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_UNDO_APPLY_PROGRESS) + { + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) rec; + + appendStringInfo(buf, "urec_ptr %lu progress %u", + xlrec->urec_ptr, xlrec->progress); + } +} + +const char * +undoaction_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_UNDO_APPLY_PROGRESS: + id = "UNDO_APPLY_PROGRESS"; + break; + } + + return id; +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 8b05374..6238240 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -18,6 +18,7 @@ #include "access/multixact.h" #include "access/nbtxlog.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -31,8 +32,8 @@ #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ + { name, redo, desc, identify, startup, cleanup, mask, undo, undo_desc }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile index 7327502..68696bc 100644 --- a/src/backend/access/undo/Makefile +++ b/src/backend/access/undo/Makefile @@ -12,6 +12,7 @@ subdir = src/backend/access/undo top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = undoaccess.o undolog.o undorecord.o undorequest.o +OBJS = undoaccess.o undoaction.o undoactionxlog.o undolog.o undorecord.o \ + undorequest.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/undo/undoaccess.c b/src/backend/access/undo/undoaccess.c index 2d413f7..2894cc7 100644 --- a/src/backend/access/undo/undoaccess.c +++ b/src/backend/access/undo/undoaccess.c @@ -65,8 +65,6 @@ static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec, Buffer *prevbuf); static int UndoRecordPrepareTransInfo(UndoRecordInsertContext *context, UndoRecPtr xact_urp, int size, int offset); -static void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, - int idx); static void UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context, UndoRecPtr urecptr, UndoRecPtr xact_urp); static int UndoGetBufferSlot(UndoRecordInsertContext *context, @@ -237,6 +235,41 @@ UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context, } /* + * Prepare to update the undo apply progress in the transaction header. + */ +void +UndoRecordPrepareApplyProgress(UndoRecordInsertContext *context, + UndoRecPtr xact_urp, BlockNumber progress) +{ + int index = 0; + int offset; + + Assert(UndoRecPtrIsValid(xact_urp)); + + /* + * Temporary undo logs are discarded on transaction commit so we don't + * need to do anything. + */ + if (UndoRecPtrGetPersistence(xact_urp) == UNDO_TEMP) + return; + + /* It shouldn't be discarded. */ + Assert(!UndoLogIsDiscarded(xact_urp)); + + /* Compute the offset of the uur_next in the undo record. */ + offset = SizeOfUndoRecordHeader + + offsetof(UndoRecordTransaction, urec_progress); + + index = UndoRecordPrepareTransInfo(context, xact_urp, + sizeof(UndoRecPtr), offset); + /* + * Set the undo action progress in xact_urec_info, this will be overwritten + * in actual undo record during update phase. + */ + context->xact_urec_info[index].progress = progress; +} + +/* * Overwrite the first undo record of the previous transaction to update its * next pointer. * @@ -244,7 +277,7 @@ UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context, * This must be called under the critical section. This will just overwrite the * header of the undo record. */ -static void +void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, int idx) { Page page = NULL; diff --git a/src/backend/access/undo/undoaction.c b/src/backend/access/undo/undoaction.c new file mode 100644 index 0000000..22360f3 --- /dev/null +++ b/src/backend/access/undo/undoaction.c @@ -0,0 +1,450 @@ +/*------------------------------------------------------------------------- + * + * undoaction.c + * execute undo actions + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undoaction.c + * + * To apply the undo actions, we collect the undo records in bulk and try to + * process them together. We ensure to update the transaction's progress at + * regular intervals so that after a crash we can skip already applied undo. + * The undo apply progress is updated in terms of the number of blocks + * processed. Undo apply progress value XACT_APPLY_PROGRESS_COMPLETED + * indicates that all the undo is applied, XACT_APPLY_PROGRESS_NOT_STARTED + * indicates that no undo action has been applied yet and any other value + * indicates that we have applied undo partially and after crash recovery, we + * need to start processing the undo from the same location. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "access/undoaction_xlog.h" +#include "access/undolog.h" +#include "access/undorequest.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "access/xlog_internal.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "utils/relfilenodemap.h" +#include "utils/syscache.h" +#include "miscadmin.h" +#include "storage/shmem.h" + +static void UpdateUndoApplyProgress(UndoRecPtr last_log_start_urec_ptr, + BlockNumber block_num); +static bool UndoAlreadyApplied(FullTransactionId full_xid, + UndoRecPtr to_urecptr); +static void ApplyUndo(UndoRecInfo *urecinfo, int nrecords); +static void ProcessAndApplyUndo(FullTransactionId full_xid, + UndoRecPtr from_urecptr, UndoRecPtr to_urecptr, + UndoRecPtr last_log_start_urec_ptr, bool complete_xact); + +/* + * undo_record_comparator + * + * qsort comparator to handle undo record for applying undo actions of the + * transaction. + */ +static int +undo_record_comparator(const void *left, const void *right) +{ + UnpackedUndoRecord *luur = ((UndoRecInfo *) left)->uur; + UnpackedUndoRecord *ruur = ((UndoRecInfo *) right)->uur; + + if (luur->uur_rmid < ruur->uur_rmid) + return -1; + else if (luur->uur_rmid > ruur->uur_rmid) + return 1; + else if (luur->uur_reloid < ruur->uur_reloid) + return -1; + else if (luur->uur_reloid > ruur->uur_reloid) + return 1; + else if (luur->uur_block < ruur->uur_block) + return -1; + else if (luur->uur_block > ruur->uur_block) + return 1; + else if (luur->uur_offset < ruur->uur_offset) + return -1; + else if (luur->uur_offset > ruur->uur_offset) + return 1; + else if (((UndoRecInfo *) left)->index < ((UndoRecInfo *) right)->index) + { + /* + * If records are for the same block and offset, then maintain their + * existing order by comparing their index in the array. + */ + return -1; + } + else + return 1; +} + +/* + * UpdateUndoApplyProgress - Updates how far undo actions from a particular + * log have been applied while rolling back a transaction. This progress is + * measured in terms of undo block number of the undo log till which the + * undo actions have been applied. + */ +static void +UpdateUndoApplyProgress(UndoRecPtr progress_urec_ptr, + BlockNumber block_num) +{ + UndoPersistence persistence; + UndoRecordInsertContext context = {{0}}; + + persistence = + UndoLogNumberGetPersistence(UndoRecPtrGetLogNo(progress_urec_ptr)); + + BeginUndoRecordInsert(&context, persistence, 1, NULL); + + /* + * Prepare and update the undo apply progress in the transaction header. + */ + UndoRecordPrepareApplyProgress(&context, progress_urec_ptr, block_num); + + START_CRIT_SECTION(); + + /* Update the progress in the transaction header. */ + UndoRecordUpdateTransInfo(&context, 0); + + /* WAL log the undo apply progress. */ + { + XLogRecPtr lsn; + xl_undoapply_progress xlrec; + + xlrec.urec_ptr = progress_urec_ptr; + xlrec.progress = block_num; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + + RegisterUndoLogBuffers(&context, 1); + lsn = XLogInsert(RM_UNDOACTION_ID, XLOG_UNDO_APPLY_PROGRESS); + UndoLogBuffersSetLSN(&context, lsn); + } + + END_CRIT_SECTION(); + + /* Release undo buffers. */ + FinishUndoRecordInsert(&context); +} + +/* + * UndoAlreadyApplied - Retruns true, if the actions are already applied, + * false, otherwise. + */ +static bool +UndoAlreadyApplied(FullTransactionId full_xid, UndoRecPtr to_urecptr) +{ + UnpackedUndoRecord *uur = NULL; + TransactionId xid PG_USED_FOR_ASSERTS_ONLY = XidFromFullTransactionId(full_xid); + + uur = UndoFetchRecord(to_urecptr, InvalidBlockNumber, InvalidOffsetNumber, + InvalidTransactionId, NULL, NULL); + + /* already processed and discarded */ + if (uur == NULL) + { + /* + * Undo action is already applied, so delete the hash table entry + * if exists. + */ + RollbackHTRemoveEntry(full_xid, to_urecptr); + return true; + } + + /* already processed */ + if (IsXactApplyProgressCompleted(uur->uur_progress)) + { + /* + * Undo action is already applied, so delete the hash table entry + * if exists. + */ + RollbackHTRemoveEntry(full_xid, to_urecptr); + UndoRecordRelease(uur); + return true; + } + + Assert(xid == uur->uur_xid); + + UndoRecordRelease(uur); + + return false; +} + +/* + * ApplyUndo - Invode rmgr specific undo apply functions. + * + * urecinfo - An array of undo records sorted in the rmgr order. + * nrecords - number of records in this array. + */ +static void +ApplyUndo(UndoRecInfo *urecinfo, int nrecords) +{ + int rmgr_start_idx = 0; + int rmgr_nrecords = 0; + int prev_rmid = -1; + int i; + + /* Apply the undo action for each rmgr. */ + for (i = 0; i < nrecords; i++) + { + UnpackedUndoRecord *uur = urecinfo[i].uur; + + Assert(uur->uur_rmid >= 0); + + /* + * If this undo is not for the same rmgr then apply all undo + * actions for the previous rmgr. + */ + if (prev_rmid >= 0 && + prev_rmid != uur->uur_rmid) + { + Assert(urecinfo[rmgr_start_idx].uur->uur_rmid == prev_rmid); + RmgrTable[prev_rmid].rm_undo(rmgr_nrecords, + &urecinfo[rmgr_start_idx]); + + rmgr_start_idx = i; + rmgr_nrecords = 0; + } + + rmgr_nrecords++; + prev_rmid = uur->uur_rmid; + } + + /* Apply the last set of the actions. */ + Assert(urecinfo[rmgr_start_idx].uur->uur_rmid == prev_rmid); + RmgrTable[prev_rmid].rm_undo(rmgr_nrecords, &urecinfo[rmgr_start_idx]); +} + +/* + * ProcessAndApplyUndo - Fetch undo records and apply actions. + * + * We always process the undo of the last log when the undo for a transaction + * spans across multiple logs. Then from there onwards the previous undo logs + * for the same transaction are processed. + * + * We also update the undo apply progress in the transaction header so that + * after recovery we don't need to process the records that are already + * processed. As we update the progress only after one batch of records, + * the crash in-between can cause us to read/apply part of undo records + * again but this will never be more than one-batch. We can further optimize + * it by marking the progress in each record, but that has its own downsides + * like it will generate more WAL and I/O corresponding to dirty undo buffers. + */ +static void +ProcessAndApplyUndo(FullTransactionId full_xid, UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, UndoRecPtr last_log_start_urec_ptr, + bool complete_xact) +{ + UndoRecInfo *urecinfo; + UndoRecPtr urec_ptr = from_urecptr; + TransactionId xid PG_USED_FOR_ASSERTS_ONLY = XidFromFullTransactionId(full_xid); + int undo_apply_size; + + /* + * We choose maintenance_work_mem to collect the undo records for + * rollbacks as most of the large rollback requests are done by + * background worker which can be considered as maintainence operation. + * However, we can introduce a new guc for this as well. + */ + undo_apply_size = maintenance_work_mem * 1024L; + + /* + * Fetch the multiple undo records that can fit into undo_apply_size; sort + * them and then rmgr specific callback to process them. Repeat this + * until we process all the records for the transaction being rolled back. + */ + do + { + BlockNumber progress_block_num = InvalidBlockNumber; + int i; + int nrecords; + bool low_switched = false; + bool update_progress = false; + + /* + * Fetch multiple undo records at once. + * + * At a time, we only fetch the undo records from a single undo log. + * Once, we process all the undo records from one undo log, we update + * the last_log_start_urec_ptr and proceed to the previous undo log. + */ + urecinfo = UndoBulkFetchRecord(&urec_ptr, last_log_start_urec_ptr, + undo_apply_size, &nrecords, false); + + /* + * Since the rollback of this transaction is in-progress, there will be + * at least one undo record which is not yet discarded. + */ + Assert(nrecords > 0); + + /* + * Get the required information from first and last undo record before + * we sort all the records. + */ + if (complete_xact) + { + if (urecinfo[nrecords - 1].uur->uur_info & UREC_INFO_LOGSWITCH) + { + /* + * We have crossed the log boundary. The rest of the undo for + * this transaction is in some other log, the location of which + * can be found from this record. See commets atop undoaccess.c. + */ + low_switched = true; + last_log_start_urec_ptr = urecinfo[nrecords - 1].uur->uur_prevlogstart; + } + else if (UndoRecPtrIsValid(urec_ptr)) + { + /* + * There are still some undo actions pending in this log. So, + * just update the progress block number. + */ + progress_block_num = UndoRecPtrGetBlockNum(urecinfo[nrecords - 1].urp); + + /* + * If we've not fetched undo records for more than one undo + * block, we can't update the progress block number. Because, + * there can still be undo records in this block that needs to + * be applied for rolling back this transaction. + */ + if (UndoRecPtrGetBlockNum(urecinfo[0].urp) > progress_block_num) + update_progress = true; + } + } + + /* + * The undo records must belong to the transaction that is being + * rolled back. + */ + Assert(TransactionIdEquals(xid, urecinfo[0].uur->uur_xid)); + + /* Sort the undo record array in order of target blocks. */ + qsort((void *) urecinfo, nrecords, sizeof(UndoRecInfo), + undo_record_comparator); + + /* Call resource manager specific callbacks to apply actions. */ + ApplyUndo(urecinfo, nrecords); + + /* + * Set undo action apply progress as completed in the transaction header + * if this is a main transaction. + */ + if (complete_xact) + { + if (low_switched) + { + /* + * We have crossed the log boundary. So, mark current log + * header as complete and set the next progress location in the + * previous log. + */ + UpdateUndoApplyProgress(last_log_start_urec_ptr, + XACT_APPLY_PROGRESS_COMPLETED); + } + else if (!UndoRecPtrIsValid(urec_ptr)) + { + /* + * Invalid urec_ptr indicates that we have executed all the undo + * actions for this transaction. So, mark current log header + * as complete. + */ + Assert(last_log_start_urec_ptr == to_urecptr); + UpdateUndoApplyProgress(last_log_start_urec_ptr, + XACT_APPLY_PROGRESS_COMPLETED); + } + else if (update_progress) + { + /* + * Update the progress block number. We increase the block + * number by one since the current block might have some undo + * records that are yet to be applied. But, all undo records + * from the next block must have been applied. + */ + UpdateUndoApplyProgress(last_log_start_urec_ptr, + progress_block_num + 1); + } + } + + /* Free all undo records. */ + for (i = 0; i < nrecords; i++) + UndoRecordRelease(urecinfo[i].uur); + + /* Free urp array for the current batch of undo records. */ + pfree(urecinfo); + + /* + * Invalid urec_ptr indicates that we have executed all the undo + * actions for this transaction. + */ + if (!UndoRecPtrIsValid(urec_ptr)) + break; + } while (true); +} + +/* + * execute_undo_actions - Execute the undo actions + * + * full_xid - Transaction id that is getting rolled back. + * from_urecptr - undo record pointer from where to start applying undo + * actions. + * to_urecptr - undo record pointer up to which the undo actions need to be + * applied. + * complete_xact - true if rollback is for complete transaction. + */ +void +execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, bool complete_xact) +{ + UndoRecPtr last_log_start_urec_ptr = to_urecptr; + + /* 'from' and 'to' pointers must be valid. */ + Assert(from_urecptr != InvalidUndoRecPtr); + Assert(to_urecptr != InvalidUndoRecPtr); + + if (complete_xact) + { + /* + * It is important here to fetch the latest undo record and validate if + * the actions are already executed. The reason is that it is possible + * that discard worker or backend might try to execute the rollback + * request which is already executed. For ex., after discard worker + * fetches the record and found that this transaction need to be + * rolledback, backend might concurrently execute the actions and + * remove the request from rollback hash table. + * + * The other case where this will be required is when the transactions + * records span across multiple logs. Say, we register the + * transaction from the first log and then we encounter the same + * transaction in the second log where its status is still not marked + * as done. Now, before we try to register the request for the second + * log, the undo worker came along rolled back the previous request + * and removed its hash entry. In this case, we will successfully + * register the request from the second log and it should be detected + * here. + */ + if (UndoAlreadyApplied(full_xid, to_urecptr)) + return; + + last_log_start_urec_ptr = + RollbackHTGetLastLogStartUrp(full_xid, to_urecptr); + } + + ProcessAndApplyUndo(full_xid, from_urecptr, to_urecptr, + last_log_start_urec_ptr, complete_xact); + + /* + * Undo actions are applied so delete the hash table entry. + */ + RollbackHTRemoveEntry(full_xid, to_urecptr); +} diff --git a/src/backend/access/undo/undoactionxlog.c b/src/backend/access/undo/undoactionxlog.c new file mode 100644 index 0000000..2b06d114 --- /dev/null +++ b/src/backend/access/undo/undoactionxlog.c @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * + * undoactionxlog.c + * WAL replay logic for undo actions. + * + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/undo/undoactionxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" +#include "access/undoaccess.h" +#include "access/xlog.h" +#include "access/xlogutils.h" + +/* + * Replay of undo apply progress. + */ +static void +undo_xlog_apply_progress(XLogReaderState *record) +{ + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) XLogRecGetData(record); + UndoPersistence persistence; + UndoRecordInsertContext context = {{0}}; + + persistence = + UndoLogNumberGetPersistence(UndoRecPtrGetLogNo(xlrec->urec_ptr)); + + BeginUndoRecordInsert(&context, persistence, 1, record); + + /* Update the undo apply progress in the transaction header. */ + UndoRecordPrepareApplyProgress(&context, xlrec->urec_ptr, + xlrec->progress); + + UndoRecordUpdateTransInfo(&context, 0); + + /* Release undo buffers. */ + FinishUndoRecordInsert(&context); +} + +void +undoaction_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_UNDO_APPLY_PROGRESS: + undo_xlog_apply_progress(record); + break; + default: + elog(PANIC, "undoaction_redo: unknown op code %u", info); + } +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f2ed561..d075aa9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -155,6 +155,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_REPLORIGIN_ID: case RM_GENERIC_ID: case RM_UNDOLOG_ID: + case RM_UNDOACTION_ID: /* just deal with xid, and done */ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), buf.origptr); diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 44f4c41..5b49cd4 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 938150d..396193b 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -20,6 +20,7 @@ #include "access/nbtxlog.h" #include "access/rmgr.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -33,7 +34,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56..e1fb42a 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 6945e3e..ef0f6ac 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,26 +25,27 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) -PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL) diff --git a/src/include/access/undoaccess.h b/src/include/access/undoaccess.h index 139e2e8..4d5f292 100644 --- a/src/include/access/undoaccess.h +++ b/src/include/access/undoaccess.h @@ -13,6 +13,7 @@ #ifndef UNDOACCESS_H #define UNDOACCESS_H +#include "access/transam.h" #include "access/undolog.h" #include "access/undorecord.h" #include "access/xlogdefs.h" @@ -91,6 +92,9 @@ typedef struct UndoRecordInsertContext int nxact_urec_info; /* Number of previous xact info. */ } UndoRecordInsertContext; +extern void UndoRecordPrepareApplyProgress(UndoRecordInsertContext *context, + UndoRecPtr urecptr, BlockNumber progress); +extern void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, int idx); extern void BeginUndoRecordInsert(UndoRecordInsertContext *context, UndoPersistence persistence, int nprepared, diff --git a/src/include/access/undoaction_xlog.h b/src/include/access/undoaction_xlog.h new file mode 100644 index 0000000..b9e65d1 --- /dev/null +++ b/src/include/access/undoaction_xlog.h @@ -0,0 +1,39 @@ +/*------------------------------------------------------------------------- + * + * undoaction_xlog.h + * undo action XLOG definitions + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoaction_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOACTION_XLOG_H +#define UNDOACTION_XLOG_H + +#include "access/undolog.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" +#include "storage/off.h" + +/* + * WAL record definitions for undoactions.c's WAL operations + */ +#define XLOG_UNDO_APPLY_PROGRESS 0x00 + +/* This is what we need to know about undo apply progress */ +typedef struct xl_undoapply_progress +{ + UndoRecPtr urec_ptr; + uint32 progress; +} xl_undoapply_progress; + +#define SizeOfUndoActionProgress (offsetof(xl_undoapply_progress, progress) + sizeof(uint32)) + +extern void undoaction_redo(XLogReaderState *record); +extern void undoaction_desc(StringInfo buf, XLogReaderState *record); +extern const char *undoaction_identify(uint8 info); + +#endif /* UNDOACTION_XLOG_H */ diff --git a/src/include/access/undorequest.h b/src/include/access/undorequest.h index 81fc3d5..1973a1f 100644 --- a/src/include/access/undorequest.h +++ b/src/include/access/undorequest.h @@ -219,8 +219,5 @@ extern FullTransactionId RollbackHTGetOldestFullXid(FullTransactionId oldestXmin /* functions exposed from undoaction.c */ extern void execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr, UndoRecPtr to_urecptr, bool nopartial); -extern bool execute_undo_actions_page(UndoRecInfo *urp_array, int first_idx, - int last_idx, Oid reloid, FullTransactionId full_xid, - BlockNumber blkno, bool blk_chain_complete); #endif /* _UNDOREQUEST_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 3cc9c3d..e66d046 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -19,10 +19,14 @@ #ifndef XLOG_INTERNAL_H #define XLOG_INTERNAL_H +#include "access/transam.h" +#include "access/undoaccess.h" +#include "access/undorecord.h" #include "access/xlogdefs.h" #include "access/xlogreader.h" #include "datatype/timestamp.h" #include "lib/stringinfo.h" +#include "nodes/pg_list.h" #include "pgtime.h" #include "storage/block.h" #include "storage/relfilenode.h" @@ -295,9 +299,11 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + void (*rm_undo) (int nrecords, UndoRecInfo *records); + void (*rm_undo_desc) (StringInfo buf, UnpackedUndoRecord *record); } RmgrData; -extern const RmgrData RmgrTable[]; +extern PGDLLIMPORT const RmgrData RmgrTable[]; /* * Exported to support xlog switching from checkpointer -- 1.8.3.1