diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c index 14bcb34..c5870d8 100644 --- a/src/backend/access/zheap/zheapam.c +++ b/src/backend/access/zheap/zheapam.c @@ -56,6 +56,7 @@ #include "access/zheapam_xlog.h" #include "access/zheap.h" #include "access/zheapscan.h" +#include "access/zheaputils.h" #include "access/zmultilocker.h" #include "catalog/catalog.h" #include "executor/tuptable.h" @@ -88,10 +89,10 @@ static void log_zheap_update(Relation reln, UnpackedUndoRecord undorecord, UnpackedUndoRecord newundorecord, UndoRecPtr urecptr, UndoRecPtr newurecptr, Buffer oldbuf, Buffer newbuf, ZHeapTuple oldtup, ZHeapTuple newtup, - int old_tup_trans_slot_id, int trans_slot_id, - int new_trans_slot_id, bool inplace_update, - bool all_visible_cleared, bool new_all_visible_cleared, - xl_undolog_meta *undometa); + ZHeapTuple old_key_tuple, int old_tup_trans_slot_id, + int trans_slot_id, int new_trans_slot_id, + bool inplace_update, bool all_visible_cleared, + bool new_all_visible_cleared, xl_undolog_meta *undometa); static HTSU_Result zheap_lock_updated_tuple(Relation rel, ZHeapTuple tuple, ItemPointer ctid, TransactionId xid, LockTupleMode mode, LockOper lockopr, @@ -103,6 +104,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup, TransactionId single_locker_xid, int single_locker_trans_slot, UndoRecPtr prev_urecptr, CommandId cid, bool any_multi_locker_member_alive); +static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, + bool key_changed, bool *copy); static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf, TransactionId tup_xid, int tup_trans_slot, uint16 old_infomask, TransactionId add_to_xid, @@ -1253,6 +1256,7 @@ zheap_delete(Relation relation, ItemPointer tid, CommandId tup_cid; ItemId lp; ZHeapTupleData zheaptup; + ZHeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ UnpackedUndoRecord undorecord; Page page; BlockNumber blkno; @@ -1274,6 +1278,7 @@ zheap_delete(Relation relation, ItemPointer tid, bool lock_reacquired; bool hasSubXactLock = false; bool hasPayload = false; + bool old_key_copied = false; xl_undolog_meta undometa; uint8 vm_status; @@ -1956,6 +1961,13 @@ zheap_tuple_updated: vm_status = visibilitymap_get_status(relation, BufferGetBlockNumber(buffer), &vmbuffer); + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ + old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true, + &old_key_copied); + START_CRIT_SECTION(); if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) || @@ -2000,6 +2012,7 @@ zheap_tuple_updated: XLogRecPtr RedoRecPtr; uint32 totalundotuplen = 0; Size dataoff; + int bufflags = 0; bool doPageWrites; /* @@ -2020,6 +2033,15 @@ zheap_tuple_updated: xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE; if (hasSubXactLock) xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT; + if (old_key_tuple != NULL) + { + bufflags |= REGBUF_KEEP_DATA; + + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY; + } /* * If full_page_writes is enabled, and the buffer image is not @@ -2065,7 +2087,27 @@ prepare_xlog: totalundotuplen - SizeofZHeapTupleHeader); } - XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags); + + /* + * Log replica identity of the deleted tuple if there is one + */ + if (old_key_tuple != NULL) + { + xl_zheap_header xlzhdr; + + xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask; + xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff; + + XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader); + XLogRegisterBufData(0, + (char *) old_key_tuple->t_data + + SizeofZHeapTupleHeader, + old_key_tuple->t_len - + SizeofZHeapTupleHeader); + } + if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS) (void) RegisterTPDBuffer(page, 1); RegisterUndoLogBuffers(2); @@ -2124,6 +2166,9 @@ prepare_xlog: if (have_tuple_lock) UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive); + if (old_key_tuple != NULL && old_key_copied) + zheap_freetuple(old_key_tuple); + pgstat_count_heap_delete(relation); return HeapTupleMayBeUpdated; @@ -2162,6 +2207,7 @@ zheap_update(Relation relation, ItemPointer otid, ZHeapTuple newtup, ItemId lp; ZHeapTupleData oldtup; ZHeapTuple zheaptup; + ZHeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ UndoRecPtr urecptr, prev_urecptr, new_prev_urecptr; UndoRecPtr new_urecptr = InvalidUndoRecPtr; UnpackedUndoRecord undorecord, new_undorecord; @@ -2199,6 +2245,7 @@ zheap_update(Relation relation, ItemPointer otid, ZHeapTuple newtup, bool lock_reacquired; bool need_toast; bool hasSubXactLock = false; + bool old_key_copied = false; xl_undolog_meta undometa; uint8 vm_status; uint8 vm_status_new = 0; @@ -3650,6 +3697,15 @@ reacquire_buffer: */ XLogEnsureRecordSpace(8, 0); + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ + old_key_tuple = ZExtractReplicaIdentity(relation, &oldtup, true, + &old_key_copied); + if (old_key_tuple != NULL && !old_key_copied) + old_key_tuple = zheap_copytuple(old_key_tuple); + START_CRIT_SECTION(); if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) || @@ -3806,15 +3862,18 @@ reacquire_buffer: log_zheap_update(relation, undorecord, new_undorecord, urecptr, new_urecptr, buffer, newbuf, - &oldtup, zheaptup, tup_trans_slot_id, - trans_slot_id, new_trans_slot_id, - use_inplace_update, all_visible_cleared, - new_all_visible_cleared, &undometa); + &oldtup, zheaptup, old_key_tuple, + tup_trans_slot_id, trans_slot_id, + new_trans_slot_id, use_inplace_update, + all_visible_cleared, new_all_visible_cleared, + &undometa); } END_CRIT_SECTION(); /* be tidy */ + if (old_key_tuple != NULL) + zheap_freetuple(old_key_tuple); pfree(undorecord.uur_tuple.data); if (undorecord.uur_payload.len > 0) pfree(undorecord.uur_payload.data); @@ -3897,10 +3956,10 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord, UnpackedUndoRecord newundorecord, UndoRecPtr urecptr, UndoRecPtr newurecptr, Buffer oldbuf, Buffer newbuf, ZHeapTuple oldtup, ZHeapTuple newtup, - int old_tup_trans_slot_id, int trans_slot_id, - int new_trans_slot_id, bool inplace_update, - bool all_visible_cleared, bool new_all_visible_cleared, - xl_undolog_meta *undometa) + ZHeapTuple old_key_tuple, int old_tup_trans_slot_id, + int trans_slot_id, int new_trans_slot_id, + bool inplace_update, bool all_visible_cleared, + bool new_all_visible_cleared, xl_undolog_meta *undometa) { xl_undo_header xlundohdr, xlnewundohdr; @@ -3915,6 +3974,7 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord, XLogRecPtr recptr; XLogRecPtr RedoRecPtr; bool doPageWrites; + bool need_tuple_data = RelationIsLogicallyLogged(reln); char *oldp = NULL; char *newp = NULL; int oldlen, newlen; @@ -3958,7 +4018,8 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord, * See log_heap_update to know under what some circumstances we can use * prefix-suffix compression. */ - if (oldbuf == newbuf && !XLogCheckBufferNeedsBackup(newbuf)) + if (oldbuf == newbuf && !need_tuple_data && + !XLogCheckBufferNeedsBackup(newbuf)) { Assert(oldp != NULL && newp != NULL); @@ -4010,6 +4071,17 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord, xlrec.flags |= XLZ_UPDATE_SUFFIX_FROM_OLD; if (undorecord.uur_info & UREC_INFO_PAYLOAD_CONTAINS_SUBXACT) xlrec.flags |= XLZ_UPDATE_CONTAINS_SUBXACT; + if (need_tuple_data) + { + xlrec.flags |= XLZ_UPDATE_CONTAINS_NEW_TUPLE; + if (old_key_tuple) + { + if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLZ_UPDATE_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLZ_UPDATE_CONTAINS_OLD_KEY; + } + } if (!inplace_update) { @@ -4079,6 +4151,9 @@ prepare_xlog: totalundotuplen - SizeofZHeapTupleHeader); } + if (need_tuple_data) + bufflags |= REGBUF_KEEP_DATA; + XLogRegisterBuffer(0, newbuf, bufflags); if (oldbuf != newbuf) { @@ -4165,6 +4240,23 @@ prepare_xlog: /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + /* We need to log a tuple identity */ + if (need_tuple_data && old_key_tuple && + !(xlrec.flags & XLZ_HAS_UPDATE_UNDOTUPLE)) + { + xl_zheap_header xlzhdr; + + xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask; + xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff; + + XLogRegisterData((char *) &xlzhdr, SizeOfZHeapHeader); + XLogRegisterData((char *) old_key_tuple->t_data + + SizeofZHeapTupleHeader, + old_key_tuple->t_len - + SizeofZHeapTupleHeader); + } + recptr = XLogInsertExtended(RM_ZHEAP_ID, info, RedoRecPtr, doPageWrites); if (recptr == InvalidXLogRecPtr) { @@ -5941,6 +6033,102 @@ prepare_xlog: } /* + * Build a zheap tuple representing the configured REPLICA IDENTITY to represent + * the old tuple in a UPDATE or DELETE. + * + * Returns NULL if there's no need to log an identity or if there's no suitable + * key in the Relation relation. + */ +static ZHeapTuple +ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed, + bool *copy) +{ + TupleDesc desc = RelationGetDescr(relation); + Oid replidindex; + Relation idx_rel; + char replident = relation->rd_rel->relreplident; + ZHeapTuple key_tuple = NULL; + bool nulls[MaxHeapAttributeNumber]; + Datum values[MaxHeapAttributeNumber]; + int natt; + + *copy = false; + + if (!RelationIsLogicallyLogged(relation)) + return NULL; + + if (replident == REPLICA_IDENTITY_NOTHING) + return NULL; + + if (replident == REPLICA_IDENTITY_FULL) + { + /* + * When logging the entire old tuple, it very well could contain + * toasted columns. If so, force them to be inlined. + */ + if (ZHeapTupleHasExternal(tp)) + { + elog(ERROR, "toast tables are not supported with replica identity"); + } + return tp; + } + + /* if the key hasn't changed and we're only logging the key, we're done */ + if (!key_changed) + return NULL; + + /* find the replica identity index */ + replidindex = RelationGetReplicaIndex(relation); + if (!OidIsValid(replidindex)) + { + elog(DEBUG4, "could not find configured replica identity for table \"%s\"", + RelationGetRelationName(relation)); + return NULL; + } + + idx_rel = RelationIdGetRelation(replidindex); + + Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true)); + + /* deform tuple, so we have fast access to columns */ + zheap_deform_tuple(tp, desc, values, nulls); + + /* set all columns to NULL, regardless of whether they actually are */ + memset(nulls, 1, sizeof(nulls)); + + /* + * Now set all columns contained in the index to NOT NULL, they cannot + * currently be NULL. + */ + for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++) + { + int attno = idx_rel->rd_index->indkey.values[natt]; + + if (attno < 0) + elog(ERROR, "system column in index"); + nulls[attno - 1] = false; + } + + key_tuple = zheap_form_tuple(desc, values, nulls); + *copy = true; + RelationClose(idx_rel); + + /* + * If the tuple, which by here only contains indexed columns, still has + * toasted columns, force them to be inlined. This is somewhat unlikely + * since there's limits on the size of indexed columns, so we don't + * duplicate toast_flatten_tuple()s functionality in the above loop over + * the indexed columns, even if it would be more efficient. + */ + if (ZHeapTupleHasExternal(key_tuple)) + { + elog(ERROR, "toast tables are not supported with replica identity"); + } + + return key_tuple; +} + +/* * compute_new_xid_infomask - Given the old values of tuple header's infomask, * compute the new values for tuple header which includes lock mode, new * infomask and transaction slot. diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index bafbbed..fc01f2b 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,10 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/zheap.h" +#include "access/zheapam_xlog.h" +#include "access/zheaputils.h" +#include "access/zhtup.h" #include "catalog/pg_control.h" @@ -45,6 +49,8 @@ #include "replication/snapbuild.h" #include "storage/standby.h" +#include "utils/rel.h" +#include "utils/relfilenodemap.h" typedef struct XLogRecordBuffer { @@ -57,6 +63,7 @@ typedef struct XLogRecordBuffer static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -74,6 +81,13 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +/* record handlers for zheap */ +static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeZUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeZMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static ZHeapTuple DecodeXLogZTuple(char *data, Size len); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -161,7 +175,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor break; case RM_ZHEAP_ID: /* Logical decoding is not yet implemented for zheap. */ - Assert(0); + DecodeZHeapOp(ctx, &buf); break; case RM_ZHEAP2_ID: /* Logical decoding is not yet implemented for zheap. */ @@ -510,6 +524,73 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer(). + */ +static void +DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + uint8 info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK; + TransactionId xid = XLogRecGetXid(buf->record); + SnapBuild *builder = ctx->snapshot_builder; + bool started_tx = false; + + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding data changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + /* This function might be called inside or outside of transaction. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + switch (info) + { + case XLOG_ZHEAP_INSERT: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeZInsert(ctx, buf); + break; + + case XLOG_ZHEAP_DELETE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeZDelete(ctx, buf); + break; + case XLOG_ZHEAP_UPDATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeZUpdate(ctx, buf); + break; + case XLOG_ZHEAP_MULTI_INSERT: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeZMultiInsert(ctx, buf); + break; + case XLOG_ZHEAP_LOCK: + /* we don't care about row level locks for now */ + break; + /* + * Everything else here is just low level physical stuff we're not + * interested in. + */ + case XLOG_ZHEAP_FREEZE_XACT_SLOT: + case XLOG_ZHEAP_INVALID_XACT_SLOT: + break; + + default: + elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info); + break; + } + + /* Commit the transaction we have started one in this function. */ + if (started_tx) + CommitTransactionCommand(); +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -1068,3 +1149,465 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs. + * + * Here we retrieve zheap tuple, convert it to heap tuple format so + * reorder buffer stream can understand the tuple format. + */ +static void +DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + Size datalen; + char *tupledata; + XLogReaderState *r = buf->record; + xl_zheap_insert *xlrec; + ReorderBufferChange *change; + RelFileNode target_node; + Relation relation = NULL; + Oid reloid; + ZHeapTuple zhtup; + HeapTuple htup; + + xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader); + + /* + * Ignore insert records without new tuples (this does happen when + * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). + */ + if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE)) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE)) + change->action = REORDER_BUFFER_CHANGE_INSERT; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + + /* + * Get the zheap tuple from WAL, convert it to heap tuple and store the + * same as change stream. + */ + zhtup = DecodeXLogZTuple(tupledata, datalen); + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + relation = RelationIdGetRelation(reloid); + htup = zheap_to_heap(zhtup, RelationGetDescr(relation)); + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, + htup->t_len - SizeofHeapTupleHeader); + change->data.tp.newtuple->tuple.t_len = htup->t_len; + change->data.tp.newtuple->tuple.t_self = htup->t_self; + change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid; + memcpy((char *) change->data.tp.newtuple->tuple.t_data, + (char *) htup->t_data, + htup->t_len); + + /* be tidy */ + pfree(zhtup); + pfree(htup); + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); +} + +/* + * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs. + * + * Deletes can possibly contain the old primary key. + */ +static void +DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_zheap_delete *xlrec; + ReorderBufferChange *change; + RelFileNode target_node; + + xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader); + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + + /* old primary key stored */ + if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD) + { + Relation relation = NULL; + ZHeapTuple zhtup; + HeapTuple htup; + char *tupledata; + Oid reloid; + Size datalen; + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + + /* + * Get the zheap tuple from WAL, convert it to heap tuple and store the + * same as change stream. + */ + zhtup = DecodeXLogZTuple(tupledata, datalen); + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + relation = RelationIdGetRelation(reloid); + htup = zheap_to_heap(zhtup, RelationGetDescr(relation)); + + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, + htup->t_len - SizeofHeapTupleHeader); + change->data.tp.oldtuple->tuple.t_len = htup->t_len; + change->data.tp.oldtuple->tuple.t_self = htup->t_self; + change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid; + memcpy((char *) change->data.tp.oldtuple->tuple.t_data, + (char *) htup->t_data, + htup->t_len); + + /* be tidy */ + pfree(zhtup); + pfree(htup); + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + } + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); +} + +/* + * Parse XLOG_ZHEAP_UPDATE from wal into proper tuplebufs. + * + * Updates can possibly contain a new tuple and the old primary key. + */ +static void +DecodeZUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_zheap_update *xlrec; + ReorderBufferChange *change; + char *data; + RelFileNode target_node; + + data = XLogRecGetData(r); + xlrec = (xl_zheap_update *) (data + SizeOfUndoHeader); + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_UPDATE; + change->origin_id = XLogRecGetOrigin(r); + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + + if (xlrec->flags & XLZ_UPDATE_CONTAINS_NEW_TUPLE) + { + Relation relation = NULL; + ZHeapTuple zhtup; + HeapTuple htup; + char *tupledata; + Oid reloid; + Size datalen; + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + zhtup = DecodeXLogZTuple(tupledata, datalen); + + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + relation = RelationIdGetRelation(reloid); + htup = zheap_to_heap(zhtup, RelationGetDescr(relation)); + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, + htup->t_len - SizeofHeapTupleHeader); + change->data.tp.newtuple->tuple.t_len = htup->t_len; + change->data.tp.newtuple->tuple.t_self = htup->t_self; + change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid; + memcpy((char *) change->data.tp.newtuple->tuple.t_data, + (char *) htup->t_data, + htup->t_len); + + /* be tidy */ + pfree(zhtup); + pfree(htup); + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + } + + if (xlrec->flags & XLZ_UPDATE_CONTAINS_OLD) + { + Relation relation = NULL; + ZHeapTuple zhtup; + HeapTuple htup; + char *tupledata; + Oid reloid; + Size datalen; + int32 offset = 0; + + offset = SizeOfZHeapUpdate + SizeOfUndoHeader; + + if (xlrec->flags & XLZ_UPDATE_OLD_CONTAINS_TPD_SLOT) + offset += sizeof(int32); + if (xlrec->flags & XLZ_NON_INPLACE_UPDATE) + { + offset += SizeOfUndoHeader; + + if (xlrec->flags & XLZ_UPDATE_NEW_CONTAINS_TPD_SLOT) + offset += sizeof(int32); + } + + tupledata = data + offset; + datalen = XLogRecGetDataLen(r) - offset; + zhtup = DecodeXLogZTuple(tupledata, datalen); + + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + relation = RelationIdGetRelation(reloid); + htup = zheap_to_heap(zhtup, RelationGetDescr(relation)); + + change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, + htup->t_len - SizeofHeapTupleHeader); + change->data.tp.oldtuple->tuple.t_len = htup->t_len; + change->data.tp.oldtuple->tuple.t_self = htup->t_self; + change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid; + memcpy((char *) change->data.tp.oldtuple->tuple.t_data, + (char *) htup->t_data, + htup->t_len); + + /* be tidy */ + pfree(zhtup); + pfree(htup); + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + } + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); +} + +/* + * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs. + * + * Currently MULTI_INSERT will always contain the full tuples. + */ +static void +DecodeZMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_zheap_multi_insert *xlrec; + Relation relation = NULL; + int i; + char *data; + char *tupledata; + Size tuplelen; + RelFileNode rnode; + + xlrec = (xl_zheap_multi_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader); + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL); + if (rnode.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + tupledata = XLogRecGetBlockData(r, 0, &tuplelen); + + data = tupledata; + for (i = 0; i < xlrec->ntuples; i++) + { + ReorderBufferChange *change; + xl_multi_insert_ztuple *xlhdr; + int datalen; + Oid reloid; + ZHeapTuple zhtup; + HeapTuple htup; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); + + /* + * CONTAINS_NEW_TUPLE will always be set currently as multi_insert + * isn't used for catalogs, but better be future proof. + * + * We decode the tuple in pretty much the same way as DecodeXLogTuple, + * but since the layout is slightly different, we can't use it here. + */ + if (xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE) + { + ZHeapTupleHeader header; + + xlhdr = (xl_multi_insert_ztuple *) SHORTALIGN(data); + data = ((char *) xlhdr) + SizeOfMultiInsertTuple; + datalen = xlhdr->datalen; + + zhtup = palloc(tuplelen + ZHEAPTUPLESIZE); + header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE); + + zhtup->t_len = tuplelen; + /* not a disk based tuple */ + ItemPointerSetInvalid(&zhtup->t_self); + + /* we can only figure this out after reassembling the transactions */ + zhtup->t_tableOid = InvalidOid; + + memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader, + (char *) data, + datalen); + + header->t_infomask = xlhdr->t_infomask; + header->t_infomask2 = xlhdr->t_infomask2; + header->t_hoff = xlhdr->t_hoff; + + if (!RelationIsValid(relation)) + { + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + relation = RelationIdGetRelation(reloid); + } + + htup = zheap_to_heap(zhtup, RelationGetDescr(relation)); + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, + htup->t_len - SizeofHeapTupleHeader); + change->data.tp.newtuple->tuple.t_len = htup->t_len; + change->data.tp.newtuple->tuple.t_self = htup->t_self; + change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid; + memcpy((char *) change->data.tp.newtuple->tuple.t_data, + (char *) htup->t_data, + htup->t_len); + + data += datalen; + + /* be tidy */ + pfree(zhtup); + pfree(htup); + } + + /* + * Reset toast reassembly state only after the last row in the last + * xl_multi_insert_tuple record emitted by one heap_multi_insert() + * call. + */ + if (xlrec->flags & XLZ_INSERT_LAST_IN_MULTI && + (i + 1) == xlrec->ntuples) + change->data.tp.clear_toast_afterwards = true; + else + change->data.tp.clear_toast_afterwards = false; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), + buf->origptr, change); + } + Assert(data == tupledata + tuplelen); + + if (RelationIsValid(relation)) + { + RelationClose(relation); + relation = NULL; + } +} + +/* + * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and + * zheap_delete (but not by zheap_multi_insert) and for in-memory tuple. + * + * The size 'len' and the pointer 'data' in the record need to be + * computed outside as they are record specific. + * + * The caller is responsible to free the memory for tuple allocated by + * this function. + */ +static ZHeapTuple +DecodeXLogZTuple(char *data, Size len) +{ + ZHeapTuple zhtup; + xl_zheap_header xlhdr; + int datalen = len - SizeOfZHeapHeader; + int tuplelen = datalen + SizeofZHeapTupleHeader; + ZHeapTupleHeader header; + + Assert(datalen >= 0); + + zhtup = palloc(tuplelen + ZHEAPTUPLESIZE); + header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE); + + zhtup->t_len = tuplelen; + /* not a disk based tuple */ + ItemPointerSetInvalid(&zhtup->t_self); + + /* we can only figure this out after reassembling the transactions */ + zhtup->t_tableOid = InvalidOid; + + /* data is not stored aligned, copy to aligned storage */ + memcpy((char *) &xlhdr, data, SizeOfZHeapHeader); + + memset(header, 0, SizeofZHeapTupleHeader); + + memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader, + data + SizeOfZHeapHeader, + datalen); + + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; + + return zhtup; +} diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h index 6d031dc..99a360d 100644 --- a/src/include/access/zheapam_xlog.h +++ b/src/include/access/zheapam_xlog.h @@ -124,6 +124,12 @@ typedef struct xl_zheap_insert #define XLZ_DELETE_CONTAINS_TPD_SLOT (1<<2) #define XLZ_DELETE_CONTAINS_SUBXACT (1<<3) #define XLZ_DELETE_IS_PARTITION_MOVE (1<<4) +#define XLZ_DELETE_CONTAINS_OLD_TUPLE (1<<5) +#define XLZ_DELETE_CONTAINS_OLD_KEY (1<<6) + +/* convenience macro for checking whether any form of old tuple was logged */ +#define XLZ_DELETE_CONTAINS_OLD \ + (XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY) /* This is what we need to know about delete */ typedef struct xl_zheap_delete @@ -155,6 +161,13 @@ typedef struct xl_zheap_delete #define XLZ_UPDATE_OLD_CONTAINS_TPD_SLOT (1<<6) #define XLZ_UPDATE_NEW_CONTAINS_TPD_SLOT (1<<7) #define XLZ_UPDATE_CONTAINS_SUBXACT (1<<8) +#define XLZ_UPDATE_CONTAINS_OLD_TUPLE (1<<9) +#define XLZ_UPDATE_CONTAINS_OLD_KEY (1<<10) +#define XLZ_UPDATE_CONTAINS_NEW_TUPLE (1<<11) + +/* convenience macro for checking whether any form of old tuple was logged */ +#define XLZ_UPDATE_CONTAINS_OLD \ + (XLZ_UPDATE_CONTAINS_OLD_TUPLE | XLZ_UPDATE_CONTAINS_OLD_KEY) /* * This is what we need to know about update|inplace_update