From 9777828b8f52fa1175fc1bf4ca2341b0bdb2cc9c Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 3 Oct 2019 09:00:49 +0530 Subject: [PATCH 06/13] Gracefully handle concurrent aborts of uncommitted transactions that are being decoded alongside. When a transaction aborts, it's changes are considered unnecessary for other transactions. That means the changes may be either cleaned up by vacuum or removed from HOT chains (thus made inaccessible through indexes), and there may be other such consequences. When decoding committed transactions this is not an issue, and we never decode transactions that abort before the decoding starts. But for in-progress transactions - for example when decoding prepared transactions on PREPARE (and not COMMIT PREPARED as before), this may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend decoding a specific uncommitted transaction. The decoding logic on the receipt of such an sqlerrcode aborts the ongoing decoding and returns gracefully. --- doc/src/sgml/logicaldecoding.sgml | 5 ++- src/backend/access/heap/heapam.c | 51 +++++++++++++++++++++++++ src/backend/access/index/genam.c | 34 +++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 32 +++++++++++++--- src/backend/utils/time/snapmgr.c | 25 +++++++++++- src/include/utils/snapmgr.h | 4 +- 6 files changed, 142 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index fc4ad65..da6a6f3 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -432,7 +432,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); - Any actions leading to transaction ID assignment are prohibited. That, among others, + Note that access to user catalog tables or regular system catalog tables + in the output plugins has to be done via the systable_* scan APIs only. + Access via the heap_* scan APIs will error out. + Additionally, any actions leading to transaction ID assignment are prohibited. That, among others, includes writing to tables, performing DDL changes, and calling txid_current(). diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e954482..6ce7878 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1303,6 +1303,17 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg_internal("only heap AM is supported"))); + /* + * We don't expect direct calls to heap_getnext with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(scan->rs_base.rs_rd) || + RelationIsUsedAsCatalogTable(scan->rs_base.rs_rd)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_getnext call"))); + /* Note: no locking manipulations needed */ HEAPDEBUG_1; /* heap_getnext( info ) */ @@ -1422,6 +1433,16 @@ heap_fetch(Relation relation, bool valid; /* + * We don't expect direct calls to heap_fetch with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_fetch call"))); + + /* * Fetch and pin the appropriate page of the relation. */ buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); @@ -1535,6 +1556,16 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, bool valid; bool skip; + /* + * We don't expect direct calls to heap_hot_search_buffer with + * valid CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_hot_search_buffer call"))); + /* If this is not the first call, previous call returned a (live!) tuple */ if (all_dead) *all_dead = first_call; @@ -1683,6 +1714,16 @@ heap_get_latest_tid(TableScanDesc sscan, Assert(ItemPointerIsValid(tid)); /* + * We don't expect direct calls to heap_get_latest_tid with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_get_latest_tid call"))); + + /* * Loop to chase down t_ctid links. At top of loop, ctid is the tuple we * need to examine, and *tid is the TID we will return if ctid turns out * to be bogus. @@ -5481,6 +5522,16 @@ heap_finish_speculative(Relation relation, ItemPointer tid) ItemId lp = NULL; HeapTupleHeader htup; + /* + * We don't expect direct calls to heap_hot_search with + * valid CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_hot_search call"))); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index 2599b5d..201acfb 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -28,6 +28,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/bufmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -477,6 +478,17 @@ systable_getnext(SysScanDesc sysscan) } } + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return htup; } @@ -513,6 +525,17 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) sysscan->slot, freshsnap); + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return result; } @@ -639,6 +662,17 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return htup; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ca4b904..1955bc5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -679,7 +679,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* setup snapshot to allow catalog access */ - SetupHistoricSnapshot(snapshot_now, NULL); + SetupHistoricSnapshot(snapshot_now, NULL, xid); PG_TRY(); { rb->message(rb, txn, lsn, false, prefix, message_size, message); @@ -1496,6 +1496,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, volatile CommandId command_id = FirstCommandId; bool using_subtxn; ReorderBufferIterTXNState *volatile iterstate = NULL; + MemoryContext ccxt = CurrentMemoryContext; txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -1529,7 +1530,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferBuildTupleCidHash(rb, txn); /* setup the initial snapshot */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); /* * Decoding needs access to syscaches et al., which in turn use @@ -1780,7 +1781,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* and continue with the new one */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -1800,7 +1801,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, snapshot_now->curcid = command_id; TeardownHistoricSnapshot(false); - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); /* * Every time the CommandId is incremented, we could @@ -1879,6 +1880,20 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_CATCH(); { /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + + /* + * if the catalog scan access returned an error of + * rollback, then abort on the other side as well + */ + if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", txn->xid); + rb->abort(rb, txn, commit_lsn); + } + if (iterstate) ReorderBufferIterTXNFinish(rb, iterstate); @@ -1902,7 +1917,14 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); - PG_RE_THROW(); + /* re-throw only if it's not an abort */ + if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) + { + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + else + FlushErrorState(); } PG_END_TRY(); } diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 47b0517..9fa1e43 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -154,6 +154,13 @@ static Snapshot CatalogSnapshot = NULL; static Snapshot HistoricSnapshot = NULL; /* + * An xid value pointing to a possibly ongoing or a prepared transaction. + * Currently used in logical decoding. It's possible that such transactions + * can get aborted while the decoding is ongoing. + */ +TransactionId CheckXidAlive = InvalidTransactionId; + +/* * These are updated by GetSnapshotData. We initialize them this way * for the convenience of TransactionIdIsInProgress: even in bootstrap * mode, we don't want it to say that BootstrapTransactionId is in progress. @@ -2029,10 +2036,14 @@ MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin) * Setup a snapshot that replaces normal catalog snapshots that allows catalog * access to behave just like it did at a certain point in the past. * + * If a valid xid is passed in, we check if it is uncommitted and track it in + * CheckXidAlive. This is to re-check XID status while accessing catalog. + * * Needed for logical decoding. */ void -SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) +SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids, + TransactionId snapshot_xid) { Assert(historic_snapshot != NULL); @@ -2041,8 +2052,17 @@ SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) /* setup (cmin, cmax) lookup hash */ tuplecid_data = tuplecids; -} + /* + * setup CheckXidAlive if it's not committed yet. We don't check + * if the xid aborted. That will happen during catalog access. + */ + if (TransactionIdIsValid(snapshot_xid) && + !TransactionIdDidCommit(snapshot_xid)) + CheckXidAlive = snapshot_xid; + else + CheckXidAlive = InvalidTransactionId; +} /* * Make catalog snapshots behave normally again. @@ -2052,6 +2072,7 @@ TeardownHistoricSnapshot(bool is_error) { HistoricSnapshot = NULL; tuplecid_data = NULL; + CheckXidAlive = InvalidTransactionId; } bool diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 67b07df..9a8f9ce 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -145,8 +145,10 @@ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); /* Support for catalog timetravel for logical decoding */ struct HTAB; +extern TransactionId CheckXidAlive; extern struct HTAB *HistoricSnapshotGetTupleCids(void); -extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids); +extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids, + TransactionId snapshot_xid); extern void TeardownHistoricSnapshot(bool is_error); extern bool HistoricSnapshotActive(void); -- 1.8.3.1