From de48e72f34fcc7e9cecc7742ebb640733f325a8e Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Thu, 30 Jun 2022 22:07:12 +0300 Subject: [PATCH] Allow locking updated tuples in tuple_update() and tuple_delete() Currently, in read committed transaction isolation mode (default), we have the following sequence of actions when tuple_update()/tuple_delete() finds the tuple updated by concurrent transaction. 1. Attempt to update/delete tuple with tuple_update()/tuple_delete(), which returns TM_Updated. 2. Lock tuple with tuple_lock(). 3. Re-evaluate plan qual (recheck if we still need to update/delete and calculate the new tuple for update). 4. Second attempt to update/delete tuple with tuple_update()/tuple_delete(). This attempt should be successful, since the tuple was previously locked. This patch eliminates step 2 by taking the lock during first tuple_update()/tuple_delete() call. Heap table access methods could save efforts by traversing chain of updated tuples once instead of twice. Future undo-based table access methods, which will start from the lastest row version. can immediately place a lock there. The code in nodeModifyTable.c is simplified by removing the nested switch/case. Also, we now table_tuple_fetch_row_version() in ExecUpdate(), because old version of row should be always the same that we've previously locked. Discussion: https://postgr.es/m/20230102154240.GL1153%40telsasoft.com Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C --- src/backend/access/heap/heapam_handler.c | 489 +++++++++++++---------- src/backend/access/table/tableam.c | 6 +- src/backend/executor/nodeModifyTable.c | 244 +++++------ src/include/access/tableam.h | 26 +- 4 files changed, 392 insertions(+), 373 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index c4b1916d36e..fdf47b7e1c4 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -53,6 +53,12 @@ static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, HeapTuple tuple, OffsetNumber tupoffset); +static TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid, + Snapshot snapshot, TupleTableSlot *slot, + CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd, bool updated); + static BlockNumber heapam_scan_get_blocks_done(HeapScanDesc hscan); static const TableAmRoutine heapam_methods; @@ -299,14 +305,35 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, static TM_Result heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, + bool lockUpdated, TupleTableSlot *lockedSlot) { + TM_Result result; + /* * Currently Deleting of index tuples are handled at vacuum, in case if * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); + result = heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); + + /* Lock concurrently updated tuple if we were told to do so */ + if (result == TM_Updated && lockUpdated) + { + result = heapam_tuple_lock_internal(relation, tid, snapshot, + lockedSlot, cid, LockTupleExclusive, + LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + tmfd, true); + + if (result == TM_Ok) + { + tmfd->traversed = true; + return TM_Updated; + } + } + + return result; } @@ -314,7 +341,8 @@ static TM_Result heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, - LockTupleMode *lockmode, bool *update_indexes) + LockTupleMode *lockmode, bool *update_indexes, + bool lockUpdated, TupleTableSlot *lockedSlot) { bool shouldFree = true; HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); @@ -341,6 +369,22 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, if (shouldFree) pfree(tuple); + /* Lock concurrently updated tuple if we were told to do so */ + if (result == TM_Updated && lockUpdated) + { + result = heapam_tuple_lock_internal(relation, otid, snapshot, + lockedSlot, cid, *lockmode, + LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + tmfd, true); + + if (result == TM_Ok) + { + tmfd->traversed = true; + return TM_Updated; + } + } + return result; } @@ -350,213 +394,8 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd) { - BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; - TM_Result result; - Buffer buffer; - HeapTuple tuple = &bslot->base.tupdata; - bool follow_updates; - - follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0; - tmfd->traversed = false; - - Assert(TTS_IS_BUFFERTUPLE(slot)); - -tuple_lock_retry: - tuple->t_self = *tid; - result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, - follow_updates, &buffer, tmfd); - - if (result == TM_Updated && - (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) - { - /* Should not encounter speculative tuple on recheck */ - Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data)); - - ReleaseBuffer(buffer); - - if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self)) - { - SnapshotData SnapshotDirty; - TransactionId priorXmax; - - /* it was updated, so look at the updated version */ - *tid = tmfd->ctid; - /* updated row should have xmin matching this xmax */ - priorXmax = tmfd->xmax; - - /* signal that a tuple later in the chain is getting locked */ - tmfd->traversed = true; - - /* - * fetch target tuple - * - * Loop here to deal with updated or busy tuples - */ - InitDirtySnapshot(SnapshotDirty); - for (;;) - { - if (ItemPointerIndicatesMovedPartitions(tid)) - ereport(ERROR, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update"))); - - tuple->t_self = *tid; - if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, true)) - { - /* - * If xmin isn't what we're expecting, the slot must have - * been recycled and reused for an unrelated tuple. This - * implies that the latest version of the row was deleted, - * so we need do nothing. (Should be safe to examine xmin - * without getting buffer's content lock. We assume - * reading a TransactionId to be atomic, and Xmin never - * changes in an existing tuple, except to invalid or - * frozen, and neither of those can match priorXmax.) - */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), - priorXmax)) - { - ReleaseBuffer(buffer); - return TM_Deleted; - } - - /* otherwise xmin should not be dirty... */ - if (TransactionIdIsValid(SnapshotDirty.xmin)) - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg_internal("t_xmin %u is uncommitted in tuple (%u,%u) to be updated in table \"%s\"", - SnapshotDirty.xmin, - ItemPointerGetBlockNumber(&tuple->t_self), - ItemPointerGetOffsetNumber(&tuple->t_self), - RelationGetRelationName(relation)))); - - /* - * If tuple is being updated by other transaction then we - * have to wait for its commit/abort, or die trying. - */ - if (TransactionIdIsValid(SnapshotDirty.xmax)) - { - ReleaseBuffer(buffer); - switch (wait_policy) - { - case LockWaitBlock: - XactLockTableWait(SnapshotDirty.xmax, - relation, &tuple->t_self, - XLTW_FetchUpdated); - break; - case LockWaitSkip: - if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) - /* skip instead of waiting */ - return TM_WouldBlock; - break; - case LockWaitError: - if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on row in relation \"%s\"", - RelationGetRelationName(relation)))); - break; - } - continue; /* loop back to repeat heap_fetch */ - } - - /* - * If tuple was inserted by our own transaction, we have - * to check cmin against cid: cmin >= current CID means - * our command cannot see the tuple, so we should ignore - * it. Otherwise heap_lock_tuple() will throw an error, - * and so would any later attempt to update or delete the - * tuple. (We need not check cmax because - * HeapTupleSatisfiesDirty will consider a tuple deleted - * by our transaction dead, regardless of cmax.) We just - * checked that priorXmax == xmin, so we can test that - * variable instead of doing HeapTupleHeaderGetXmin again. - */ - if (TransactionIdIsCurrentTransactionId(priorXmax) && - HeapTupleHeaderGetCmin(tuple->t_data) >= cid) - { - tmfd->xmax = priorXmax; - - /* - * Cmin is the problematic value, so store that. See - * above. - */ - tmfd->cmax = HeapTupleHeaderGetCmin(tuple->t_data); - ReleaseBuffer(buffer); - return TM_SelfModified; - } - - /* - * This is a live tuple, so try to lock it again. - */ - ReleaseBuffer(buffer); - goto tuple_lock_retry; - } - - /* - * If the referenced slot was actually empty, the latest - * version of the row must have been deleted, so we need do - * nothing. - */ - if (tuple->t_data == NULL) - { - Assert(!BufferIsValid(buffer)); - return TM_Deleted; - } - - /* - * As above, if xmin isn't what we're expecting, do nothing. - */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), - priorXmax)) - { - ReleaseBuffer(buffer); - return TM_Deleted; - } - - /* - * If we get here, the tuple was found but failed - * SnapshotDirty. Assuming the xmin is either a committed xact - * or our own xact (as it certainly should be if we're trying - * to modify the tuple), this must mean that the row was - * updated or deleted by either a committed xact or our own - * xact. If it was deleted, we can ignore it; if it was - * updated then chain up to the next version and repeat the - * whole process. - * - * As above, it should be safe to examine xmax and t_ctid - * without the buffer content lock, because they can't be - * changing. We'd better hold a buffer pin though. - */ - if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid)) - { - /* deleted, so forget about it */ - ReleaseBuffer(buffer); - return TM_Deleted; - } - - /* updated, so look at the updated row */ - *tid = tuple->t_data->t_ctid; - /* updated row should have xmin matching this xmax */ - priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); - ReleaseBuffer(buffer); - /* loop back to fetch next in chain */ - } - } - else - { - /* tuple was deleted, so give up */ - return TM_Deleted; - } - } - - slot->tts_tableOid = RelationGetRelid(relation); - tuple->t_tableOid = slot->tts_tableOid; - - /* store in slot, transferring existing pin */ - ExecStorePinnedBufferHeapTuple(tuple, slot, buffer); - - return result; + return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid, mode, + wait_policy, flags, tmfd, false); } @@ -2523,6 +2362,236 @@ SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, } } +/* + * This routine does the work for heapam_tuple_lock(), but also support + * `updated` to re-use the work done by heapam_tuple_update() or + * heapam_tuple_delete() on fetching tuple and checking its visibility. + */ +static TM_Result +heapam_tuple_lock_internal(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd, bool updated) +{ + BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; + TM_Result result; + Buffer buffer = InvalidBuffer; + HeapTuple tuple = &bslot->base.tupdata; + bool follow_updates; + + follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0; + tmfd->traversed = false; + + Assert(TTS_IS_BUFFERTUPLE(slot)); + +tuple_lock_retry: + tuple->t_self = *tid; + if (!updated) + result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, + follow_updates, &buffer, tmfd); + else + result = TM_Updated; + + if (result == TM_Updated && + (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) + { + if (!updated) + { + /* Should not encounter speculative tuple on recheck */ + Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data)); + + ReleaseBuffer(buffer); + } + else + { + updated = false; + } + + if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self)) + { + SnapshotData SnapshotDirty; + TransactionId priorXmax; + + /* it was updated, so look at the updated version */ + *tid = tmfd->ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = tmfd->xmax; + + /* signal that a tuple later in the chain is getting locked */ + tmfd->traversed = true; + + /* + * fetch target tuple + * + * Loop here to deal with updated or busy tuples + */ + InitDirtySnapshot(SnapshotDirty); + for (;;) + { + if (ItemPointerIndicatesMovedPartitions(tid)) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update"))); + + tuple->t_self = *tid; + if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, true)) + { + /* + * If xmin isn't what we're expecting, the slot must have + * been recycled and reused for an unrelated tuple. This + * implies that the latest version of the row was deleted, + * so we need do nothing. (Should be safe to examine xmin + * without getting buffer's content lock. We assume + * reading a TransactionId to be atomic, and Xmin never + * changes in an existing tuple, except to invalid or + * frozen, and neither of those can match priorXmax.) + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), + priorXmax)) + { + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* otherwise xmin should not be dirty... */ + if (TransactionIdIsValid(SnapshotDirty.xmin)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("t_xmin %u is uncommitted in tuple (%u,%u) to be updated in table \"%s\"", + SnapshotDirty.xmin, + ItemPointerGetBlockNumber(&tuple->t_self), + ItemPointerGetOffsetNumber(&tuple->t_self), + RelationGetRelationName(relation)))); + + /* + * If tuple is being updated by other transaction then we + * have to wait for its commit/abort, or die trying. + */ + if (TransactionIdIsValid(SnapshotDirty.xmax)) + { + ReleaseBuffer(buffer); + switch (wait_policy) + { + case LockWaitBlock: + XactLockTableWait(SnapshotDirty.xmax, + relation, &tuple->t_self, + XLTW_FetchUpdated); + break; + case LockWaitSkip: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + /* skip instead of waiting */ + return TM_WouldBlock; + break; + case LockWaitError: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on row in relation \"%s\"", + RelationGetRelationName(relation)))); + break; + } + continue; /* loop back to repeat heap_fetch */ + } + + /* + * If tuple was inserted by our own transaction, we have + * to check cmin against cid: cmin >= current CID means + * our command cannot see the tuple, so we should ignore + * it. Otherwise heap_lock_tuple() will throw an error, + * and so would any later attempt to update or delete the + * tuple. (We need not check cmax because + * HeapTupleSatisfiesDirty will consider a tuple deleted + * by our transaction dead, regardless of cmax.) We just + * checked that priorXmax == xmin, so we can test that + * variable instead of doing HeapTupleHeaderGetXmin again. + */ + if (TransactionIdIsCurrentTransactionId(priorXmax) && + HeapTupleHeaderGetCmin(tuple->t_data) >= cid) + { + tmfd->xmax = priorXmax; + + /* + * Cmin is the problematic value, so store that. See + * above. + */ + tmfd->cmax = HeapTupleHeaderGetCmin(tuple->t_data); + ReleaseBuffer(buffer); + return TM_SelfModified; + } + + /* + * This is a live tuple, so try to lock it again. + */ + ReleaseBuffer(buffer); + goto tuple_lock_retry; + } + + /* + * If the referenced slot was actually empty, the latest + * version of the row must have been deleted, so we need do + * nothing. + */ + if (tuple->t_data == NULL) + { + Assert(!BufferIsValid(buffer)); + return TM_Deleted; + } + + /* + * As above, if xmin isn't what we're expecting, do nothing. + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), + priorXmax)) + { + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* + * If we get here, the tuple was found but failed + * SnapshotDirty. Assuming the xmin is either a committed xact + * or our own xact (as it certainly should be if we're trying + * to modify the tuple), this must mean that the row was + * updated or deleted by either a committed xact or our own + * xact. If it was deleted, we can ignore it; if it was + * updated then chain up to the next version and repeat the + * whole process. + * + * As above, it should be safe to examine xmax and t_ctid + * without the buffer content lock, because they can't be + * changing. We'd better hold a buffer pin though. + */ + if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid)) + { + /* deleted, so forget about it */ + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* updated, so look at the updated row */ + *tid = tuple->t_data->t_ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + ReleaseBuffer(buffer); + /* loop back to fetch next in chain */ + } + } + else + { + /* tuple was deleted, so give up */ + return TM_Deleted; + } + } + + slot->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slot->tts_tableOid; + + /* store in slot, transferring existing pin */ + ExecStorePinnedBufferHeapTuple(tuple, slot, buffer); + + return result; +} + /* ------------------------------------------------------------------------ * Definition of the heap table access method. diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index ef0d34fceee..f3476697ff0 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -306,7 +306,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot) GetCurrentCommandId(true), snapshot, InvalidSnapshot, true /* wait for commit */ , - &tmfd, false /* changingPart */ ); + &tmfd, false /* changingPart */ , + false, NULL); switch (result) { @@ -355,7 +356,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid, GetCurrentCommandId(true), snapshot, InvalidSnapshot, true /* wait for commit */ , - &tmfd, &lockmode, update_indexes); + &tmfd, &lockmode, update_indexes, + false, NULL); switch (result) { diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 56398c399c9..ca396ea2dfd 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1284,7 +1284,8 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, */ static TM_Result ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, - ItemPointer tupleid, bool changingPart) + ItemPointer tupleid, bool changingPart, + bool lockUpdated, TupleTableSlot *lockedSlot) { EState *estate = context->estate; @@ -1294,7 +1295,9 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, estate->es_crosscheck_snapshot, true /* wait for commit */ , &context->tmfd, - changingPart); + changingPart, + lockUpdated, + lockedSlot); } /* @@ -1439,7 +1442,16 @@ ExecDelete(ModifyTableContext *context, * transaction-snapshot mode transactions. */ ldelete: - result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart); + + /* + * Ask ExecDeleteAct() to immediately place the lock the updated tuple + * if we will need EvalPlanQual() in that case to handle it. + */ + if (!IsolationUsesXactSnapshot()) + slot = ExecGetReturningSlot(estate, resultRelInfo); + + result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart, + !IsolationUsesXactSnapshot(), slot); switch (result) { @@ -1492,87 +1504,34 @@ ldelete: errmsg("could not serialize access due to concurrent update"))); /* - * Already know that we're going to need to do EPQ, so - * fetch tuple directly into the right slot. + * ExecDeleteAct() have already locked the old tuple for + * us. Now we need to copy it to the right slot. */ EvalPlanQualBegin(context->epqstate); inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); + ExecCopySlot(inputslot, slot); - result = table_tuple_lock(resultRelationDesc, tupleid, - estate->es_snapshot, - inputslot, estate->es_output_cid, - LockTupleExclusive, LockWaitBlock, - TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &context->tmfd); + Assert(context->tmfd.traversed); + epqslot = EvalPlanQual(context->epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + if (TupIsNull(epqslot)) + /* Tuple not passing quals anymore, exiting... */ + return NULL; - switch (result) + /* + * If requested, skip delete and pass back the updated + * row. + */ + if (epqreturnslot) { - case TM_Ok: - Assert(context->tmfd.traversed); - epqslot = EvalPlanQual(context->epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - inputslot); - if (TupIsNull(epqslot)) - /* Tuple not passing quals anymore, exiting... */ - return NULL; - - /* - * If requested, skip delete and pass back the - * updated row. - */ - if (epqreturnslot) - { - *epqreturnslot = epqslot; - return NULL; - } - else - goto ldelete; - - case TM_SelfModified: - - /* - * This can be reached when following an update - * chain from a tuple updated by another session, - * reaching a tuple that was already updated in - * this transaction. If previously updated by this - * command, ignore the delete, otherwise error - * out. - * - * See also TM_SelfModified response to - * table_tuple_delete() above. - */ - if (context->tmfd.cmax != estate->es_output_cid) - ereport(ERROR, - (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), - errmsg("tuple to be deleted was already modified by an operation triggered by the current command"), - errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); - return NULL; - - case TM_Deleted: - /* tuple already deleted; nothing to do */ - return NULL; - - default: - - /* - * TM_Invisible should be impossible because we're - * waiting for updated row versions, and would - * already have errored out if the first version - * is invisible. - * - * TM_Updated should be impossible, because we're - * locking the latest version via - * TUPLE_LOCK_FLAG_FIND_LAST_VERSION. - */ - elog(ERROR, "unexpected table_tuple_lock status: %u", - result); - return NULL; + *epqreturnslot = epqslot; + return NULL; } - - Assert(false); - break; + else + goto ldelete; } case TM_Deleted: @@ -1629,7 +1588,7 @@ ldelete: { ExecForceStoreHeapTuple(oldtuple, slot, false); } - else + else if (TupIsNull(slot)) { if (!table_tuple_fetch_row_version(resultRelationDesc, tupleid, SnapshotAny, slot)) @@ -1904,7 +1863,8 @@ ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo, static TM_Result ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot, - bool canSetTag, UpdateContext *updateCxt) + bool canSetTag, UpdateContext *updateCxt, bool lockUpdated, + TupleTableSlot *lockedSlot) { EState *estate = context->estate; Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; @@ -2038,7 +1998,8 @@ lreplace: estate->es_crosscheck_snapshot, true /* wait for commit */ , &context->tmfd, &updateCxt->lockmode, - &updateCxt->updateIndexes); + &updateCxt->updateIndexes, + lockUpdated, lockedSlot); if (result == TM_Ok) updateCxt->updated = true; @@ -2189,7 +2150,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context, static TupleTableSlot * ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot, - bool canSetTag) + bool canSetTag, bool locked) { EState *estate = context->estate; Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; @@ -2242,12 +2203,37 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, } else { + TupleTableSlot *oldSlot; + bool lockUpdated; + /* Fill in the slot appropriately */ ExecUpdatePrepareSlot(resultRelInfo, slot, estate); redo_act: + + /* + * Ask ExecUpdateAct() to immediately place the lock the updated tuple + * if we will need EvalPlanQual() in that case to handle it. + */ + if (!IsolationUsesXactSnapshot() && !locked) + { + /* Make sure ri_oldTupleSlot is initialized. */ + if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) + ExecInitUpdateProjection(context->mtstate, + resultRelInfo); + + /* Fetch the most recent version of old tuple. */ + lockUpdated = true; + oldSlot = resultRelInfo->ri_oldTupleSlot; + } + else + { + lockUpdated = false; + oldSlot = NULL; + } + result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot, - canSetTag, &updateCxt); + canSetTag, &updateCxt, lockUpdated, oldSlot); /* * If ExecUpdateAct reports that a cross-partition update was done, @@ -2300,86 +2286,32 @@ redo_act: { TupleTableSlot *inputslot; TupleTableSlot *epqslot; - TupleTableSlot *oldSlot; if (IsolationUsesXactSnapshot()) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"))); + Assert(!locked); /* - * Already know that we're going to need to do EPQ, so - * fetch tuple directly into the right slot. + * ExecUpdateAct() have already locked the old tuple for + * us. Now we need to copy it to the right slot. */ inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); - - result = table_tuple_lock(resultRelationDesc, tupleid, - estate->es_snapshot, - inputslot, estate->es_output_cid, - updateCxt.lockmode, LockWaitBlock, - TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &context->tmfd); - - switch (result) - { - case TM_Ok: - Assert(context->tmfd.traversed); - - epqslot = EvalPlanQual(context->epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - inputslot); - if (TupIsNull(epqslot)) - /* Tuple not passing quals anymore, exiting... */ - return NULL; - - /* Make sure ri_oldTupleSlot is initialized. */ - if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) - ExecInitUpdateProjection(context->mtstate, - resultRelInfo); - - /* Fetch the most recent version of old tuple. */ - oldSlot = resultRelInfo->ri_oldTupleSlot; - if (!table_tuple_fetch_row_version(resultRelationDesc, - tupleid, - SnapshotAny, - oldSlot)) - elog(ERROR, "failed to fetch tuple being updated"); - slot = ExecGetUpdateNewTuple(resultRelInfo, - epqslot, oldSlot); - goto redo_act; - - case TM_Deleted: - /* tuple already deleted; nothing to do */ - return NULL; - - case TM_SelfModified: - - /* - * This can be reached when following an update - * chain from a tuple updated by another session, - * reaching a tuple that was already updated in - * this transaction. If previously modified by - * this command, ignore the redundant update, - * otherwise error out. - * - * See also TM_SelfModified response to - * table_tuple_update() above. - */ - if (context->tmfd.cmax != estate->es_output_cid) - ereport(ERROR, - (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), - errmsg("tuple to be updated was already modified by an operation triggered by the current command"), - errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); - return NULL; - - default: - /* see table_tuple_lock call in ExecDelete() */ - elog(ERROR, "unexpected table_tuple_lock status: %u", - result); - return NULL; - } + ExecCopySlot(inputslot, oldSlot); + Assert(context->tmfd.traversed); + + epqslot = EvalPlanQual(context->epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + if (TupIsNull(epqslot)) + /* Tuple not passing quals anymore, exiting... */ + return NULL; + slot = ExecGetUpdateNewTuple(resultRelInfo, + epqslot, oldSlot); + goto redo_act; } break; @@ -2623,7 +2555,7 @@ ExecOnConflictUpdate(ModifyTableContext *context, *returning = ExecUpdate(context, resultRelInfo, conflictTid, NULL, resultRelInfo->ri_onConflict->oc_ProjSlot, - canSetTag); + canSetTag, true); /* * Clear out existing tuple, as there might not be another conflict among @@ -2830,7 +2762,8 @@ lmerge_matched: } ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate); result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL, - newslot, mtstate->canSetTag, &updateCxt); + newslot, mtstate->canSetTag, &updateCxt, + false, NULL); if (result == TM_Ok && updateCxt.updated) { ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, @@ -2848,7 +2781,8 @@ lmerge_matched: result = TM_Ok; break; } - result = ExecDeleteAct(context, resultRelInfo, tupleid, false); + result = ExecDeleteAct(context, resultRelInfo, tupleid, false, + false, NULL); if (result == TM_Ok) { ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL, @@ -3796,7 +3730,7 @@ ExecModifyTable(PlanState *pstate) /* Now apply the update. */ slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple, - slot, node->canSetTag); + slot, node->canSetTag, false); break; case CMD_DELETE: diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 3fb184717f6..65797cd9f90 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -514,7 +514,9 @@ typedef struct TableAmRoutine Snapshot crosscheck, bool wait, TM_FailureData *tmfd, - bool changingPart); + bool changingPart, + bool lockUpdated, + TupleTableSlot *lockedSlot); /* see table_tuple_update() for reference about parameters */ TM_Result (*tuple_update) (Relation rel, @@ -526,7 +528,9 @@ typedef struct TableAmRoutine bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - bool *update_indexes); + bool *update_indexes, + bool lockUpdated, + TupleTableSlot *lockedSlot); /* see table_tuple_lock() for reference about parameters */ TM_Result (*tuple_lock) (Relation rel, @@ -1449,6 +1453,9 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, * tmfd - filled in failure cases (see below) * changingPart - true iff the tuple is being moved to another partition * table due to an update of the partition key. Otherwise, false. + * lockUpdated - true if should lock the last row version if the concurrent + * update is detected + * lockedSlot - slot to save the locked tuple when lockUpdated is true * * Normal, successful return value is TM_Ok, which means we did actually * delete it. Failure return codes are TM_SelfModified, TM_Updated, and @@ -1461,11 +1468,13 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, static inline TM_Result table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, + bool lockUpdated, TupleTableSlot *lockedSlot) { return rel->rd_tableam->tuple_delete(rel, tid, cid, snapshot, crosscheck, - wait, tmfd, changingPart); + wait, tmfd, changingPart, + lockUpdated, lockedSlot); } /* @@ -1487,6 +1496,9 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid, * lockmode - filled with lock mode acquired on tuple * update_indexes - in success cases this is set to true if new index entries * are required for this tuple + * lockUpdated - true if should lock the last row version if the concurrent + * update is detected + * lockedSlot - slot to save the locked tuple when lockUpdated is true * * Normal, successful return value is TM_Ok, which means we did actually * update it. Failure return codes are TM_SelfModified, TM_Updated, and @@ -1506,12 +1518,14 @@ static inline TM_Result table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - bool *update_indexes) + bool *update_indexes, bool lockUpdated, + TupleTableSlot *lockedSlot) { return rel->rd_tableam->tuple_update(rel, otid, slot, cid, snapshot, crosscheck, wait, tmfd, - lockmode, update_indexes); + lockmode, update_indexes, + lockUpdated, lockedSlot); } /* -- 2.24.3 (Apple Git-128)