From 8246bfc47e26ceceac35eba7bcad4eba79f09ad7 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 19 May 2020 18:55:23 +0530 Subject: [PATCH v23 06/12] Bugfix handling of incomplete toast/spec insert tuple --- src/backend/access/heap/heapam.c | 3 + src/backend/replication/logical/decode.c | 17 +- .../replication/logical/reorderbuffer.c | 324 ++++++++++++------ src/include/access/heapam_xlog.h | 1 + src/include/replication/reorderbuffer.h | 39 ++- 5 files changed, 277 insertions(+), 107 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 2d77107c4f..3927448f46 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1955,6 +1955,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, { xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; + + if (IsToastRelation(relation)) + xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; } XLogBeginInsert(); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 69c1f45ef6..c841687c66 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -727,7 +727,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); } /* @@ -794,7 +796,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -851,7 +854,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -887,7 +891,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) memcpy(change->data.truncate.relids, xlrec->relids, xlrec->nrelids * sizeof(Oid)); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); } /* @@ -987,7 +991,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = false; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); /* move to the next xl_multi_insert_tuple entry */ data += datalen; @@ -1025,7 +1029,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2cdfb348af..fe2d0011c4 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -179,6 +179,21 @@ typedef struct ReorderBufferDiskChange /* data follows */ } ReorderBufferDiskChange; +#define IsSpecInsert(action) \ +( \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \ +) +#define IsSpecConfirm(action) \ +( \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \ +) +#define IsInsertOrUpdate(action) \ +( \ + (((action) == REORDER_BUFFER_CHANGE_INSERT) || \ + ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \ +) + /* * Maximum number of changes kept in memory, per transaction. After that, * changes are spooled to disk. @@ -254,6 +269,8 @@ static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, static inline bool ReorderBufferCanStream(ReorderBuffer *rb); static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn); +static inline void ReorderBufferTXNDeleteChange(ReorderBufferTXN *txn, + ReorderBufferChange *change); /* --------------------------------------- * toast reassembly support @@ -646,12 +663,71 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, return txn; } +/* + * Handle incomplete tuple during streaming. If streaming is enabled then we + * might need to stream the in-progress transaction. So the problem is that + * sometime we might get some incomplete changes which we can not stream + * until we get the complete change. e.g. toast table insert without the main + * table insert. So this function remember the lsn of the last complete change + * and the complete size upto last complete lsn so that if we need to stream + * we can only stream upto last complete lsn. + */ +static void +ReorderBufferHandleIncompleteTuple(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, + bool toast_insert) +{ + /* If streaming is not enable then nothing to do. */ + if (!ReorderBufferCanStream(rb)) + return; + + /* Get the top transaction. */ + if (txn->toptxn != NULL) + txn = txn->toptxn; + + /* + * If this is a first incomplete change then set the size of the complete + * change. + */ + if (!(rbtxn_has_incomplete_tuple(txn)) && + (toast_insert || IsSpecInsert(change->action))) + txn->complete_size = txn->total_size; + + /* + * If this is a toast insert then set the corresponding bit. Basically, + * both update and insert will do the insert in the toast table. And as + * explained in the function header we can not stream the only toast + * changes. So whenever we get the toast insert we set the flag and clear + * the same whenever we get the next insert or update on the main table. + */ + if (toast_insert) + txn->txn_flags |= RBTXN_HAS_TOAST_INSERT; + else if (rbtxn_has_toast_insert(txn) && IsInsertOrUpdate(change->action)) + txn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT; + + /* + * Set the spec insert bit whenever we get the speculative insert to + * indicate the partial tuple and clear the same on speculative confirm. + */ + if (IsSpecInsert(change->action)) + txn->txn_flags |= RBTXN_HAS_SPEC_INSERT; + else if (IsSpecConfirm(change->action)) + txn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT; + + /* + * If we don't have any incomplete change after this change then set this + * LSN as last complete lsn. + */ + if (!(rbtxn_has_incomplete_tuple(txn))) + txn->last_complete_lsn = change->lsn; +} + /* * Queue a change into a transaction so it can be replayed upon commit. */ void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, - ReorderBufferChange *change) + ReorderBufferChange *change, bool toast_insert) { ReorderBufferTXN *txn; @@ -660,6 +736,9 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, change->lsn = lsn; change->txn = txn; + /* Handle the incomplete tuple if it's a toast/spec insert */ + ReorderBufferHandleIncompleteTuple(rb, txn, change, toast_insert); + Assert(InvalidXLogRecPtr != lsn); dlist_push_tail(&txn->changes, &change->node); txn->nentries++; @@ -697,7 +776,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, change->data.msg.message = palloc(message_size); memcpy(change->data.msg.message, message, message_size); - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -1412,6 +1491,30 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { dlist_mutable_iter iter; + ReorderBufferTXN *toptxn; + + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + /* + * Mark the transaction as streamed. + * + * The toplevel transaction, identified by (toptxn==NULL), is marked + * as streamed always, even if it does not contain any changes (that + * is, when all the changes are in subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * changes in them. + * + * We do it this way because of aborts - we don't want to send aborts + * for XIDs the downstream is not aware of. And of course, it always + * knows about the toplevel xact (we send the XID in all messages), + * but we never stream XIDs of empty subxacts. + */ + if ((!txn->toptxn) || (txn->nentries_mem != 0)) + txn->txn_flags |= RBTXN_IS_STREAMED; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1438,30 +1541,28 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) change = dlist_container(ReorderBufferChange, node, iter.cur); - /* remove the change from it's containing list */ - dlist_delete(&change->node); + /* We have truncated upto last complete lsn so stop. */ + if (rbtxn_has_incomplete_tuple(toptxn) && + (change->lsn > toptxn->last_complete_lsn)) + { + /* + * If this is a top transaction then we can reset the + * last_complete_lsn and complete_size, because by now we would + * have stream all the changes upto last_complete_lsn. + */ + if (txn->toptxn == NULL) + { + toptxn->last_complete_lsn = InvalidXLogRecPtr; + toptxn->complete_size = 0; + } + break; + } + /* remove the change from it's containing list */ + ReorderBufferTXNDeleteChange(txn, change); ReorderBufferReturnChange(rb, change); } - /* - * Mark the transaction as streamed. - * - * The toplevel transaction, identified by (toptxn==NULL), is marked - * as streamed always, even if it does not contain any changes (that - * is, when all the changes are in subtransactions). - * - * For subtransactions, we only mark them as streamed when there are - * changes in them. - * - * We do it this way because of aborts - we don't want to send aborts - * for XIDs the downstream is not aware of. And of course, it always - * knows about the toplevel xact (we send the XID in all messages), - * but we never stream XIDs of empty subxacts. - */ - if ((!txn->toptxn) || (txn->nentries_mem != 0)) - txn->txn_flags |= RBTXN_IS_STREAMED; - /* * Destroy the (relfilenode, ctid) hashtable, so that we don't leak * any memory. We could also keep the hash table and update it with @@ -1473,9 +1574,15 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->tuplecid_hash = NULL; } - /* also reset the number of entries in the transaction */ - txn->nentries_mem = 0; - txn->nentries = 0; + /* + * If this txn is serialized and there are no more entries in the disk then + * clean the disk space. + */ + if (rbtxn_is_serialized(txn) && (txn->nentries == txn->nentries_mem)) + { + ReorderBufferRestoreCleanup(rb, txn); + txn->txn_flags &= ~RBTXN_IS_SERIALIZED; + } } /* @@ -1732,6 +1839,20 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message); } +/* + * While streaming a transaction, due to incomplete tuple we can not always + * stream all the changes. So whenever we are deleting any change from the + * change list we need to update the entries count. + */ +static inline void +ReorderBufferTXNDeleteChange(ReorderBufferTXN *txn, ReorderBufferChange *change) +{ + /* Delete the node and decrement the nentries_mem and nentries count. */ + dlist_delete(&change->node); + change->txn->nentries_mem--; + change->txn->nentries--; +} + /* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. @@ -1955,8 +2076,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * disk. */ Assert(change->data.tp.newtuple != NULL); - - dlist_delete(&change->node); + ReorderBufferTXNDeleteChange(change->txn, change); ReorderBufferToastAppendChunk(rb, txn, relation, change); } @@ -2002,8 +2122,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, specinsert = NULL; } - /* and memorize the pending insertion */ - dlist_delete(&change->node); + /* + * Remove from the change list and memorize the pending + * insertion + */ + ReorderBufferTXNDeleteChange(change->txn, change); specinsert = change; break; @@ -2118,6 +2241,15 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "tuplecid value in changequeue"); break; } + + /* + * If the transaction contains incomplete tuple and this is the + * last complete change then stop further processing of the + * transaction. + */ + if (rbtxn_has_incomplete_tuple(txn) && + prev_lsn == txn->last_complete_lsn) + break; } /* @@ -2515,7 +2647,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, change->data.snapshot = snap; change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2564,7 +2696,7 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, change->data.command_id = cid; change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2587,6 +2719,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { Size sz; ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn = NULL; Assert(change->txn); @@ -2601,8 +2734,13 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, txn = change->txn; /* if subxact, and streaming supported, use the toplevel instead */ - if (txn->toptxn && ReorderBufferCanStream(rb)) - txn = txn->toptxn; + if (ReorderBufferCanStream(rb)) + { + if (txn->toptxn) + toptxn = txn->toptxn; + else + toptxn = txn; + } sz = ReorderBufferChangeSize(change); @@ -2610,12 +2748,20 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { txn->size += sz; rb->size += sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size += sz; } else { Assert((rb->size >= sz) && (txn->size >= sz)); txn->size -= sz; rb->size -= sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size -= sz; } Assert(txn->size <= rb->size); @@ -2676,7 +2822,7 @@ ReorderBufferAddInvalidation(ReorderBuffer *rb, TransactionId xid, memcpy(change->data.inval.invalidations, msgs, sizeof(SharedInvalidationMessage) * nmsgs); - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -2860,18 +3006,28 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) dlist_foreach(iter, &rb->toplevel_by_lsn) { ReorderBufferTXN *txn; + Size size = 0; + Size largest_size = 0; txn = dlist_container(ReorderBufferTXN, node, iter.cur); - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) + /* + * If this transaction have some incomplete changes then only consider + * the size upto last complete lsn. + */ + if (rbtxn_has_incomplete_tuple(txn)) + size = txn->complete_size; + else + size = txn->total_size; + + /* If the current transaction is larger then remember it. */ + if ((largest != NULL || size > largest_size) && size > 0) + { largest = txn; + largest_size = size; + } } - Assert(largest); - Assert(largest->size > 0); - Assert(largest->size <= rb->size); - return largest; } @@ -2889,66 +3045,46 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) - return; - - /* - * Pick the largest transaction (or subtransaction) and evict it from - * memory by streaming, if supported. Otherwise spill to disk. - */ - if (ReorderBufferCanStream(rb)) - { - /* - * Pick the largest toplevel transaction and evict it from memory by - * streaming the already decoded part. - */ - txn = ReorderBufferLargestTopTXN(rb); - - /* we know there has to be one, because the size is not zero */ - Assert(txn && !txn->toptxn); - Assert(txn->size > 0); - Assert(rb->size >= txn->size); - - ReorderBufferStreamTXN(rb, txn); - } - else + /* Loop until we reach under the memory limit. */ + while (rb->size >= logical_decoding_work_mem * 1024L) { /* * Pick the largest transaction (or subtransaction) and evict it from - * memory by serializing it to disk. + * memory by streaming, if supported. Otherwise spill to disk. */ - txn = ReorderBufferLargestTXN(rb); + if (ReorderBufferCanStream(rb) && + (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + { + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->total_size > 0); + Assert(rb->size >= txn->total_size); - /* we know there has to be one, because the size is not zero */ - Assert(txn); - Assert(txn->size > 0); - Assert(rb->size >= txn->size); + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it from + * memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); - ReorderBufferSerializeTXN(rb, txn); - } + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); - /* - * After eviction, the transaction should have no entries in memory, and - * should use 0 bytes for changes. - * - * XXX Checking the size is fine for both cases - spill to disk and - * streaming. But for streaming we should really check nentries_mem for - * all subtransactions too. - */ - Assert(txn->size == 0); - Assert(txn->nentries_mem == 0); + ReorderBufferSerializeTXN(rb, txn); + /* + * After eviction, the transaction should have no entries in memory, and + * should use 0 bytes for changes. + */ + Assert(txn->size == 0); + Assert(txn->nentries_mem == 0); + } + } - /* - * And furthermore, evicting the transaction should get us below the - * memory limit again - it is not possible that we're still exceeding the - * memory limit after evicting the transaction. - * - * This follows from the simple fact that the selected transaction is at - * least as large as the most recent change (which caused us to go over - * the memory limit). So by evicting it we're definitely back below the - * memory limit. - */ Assert(rb->size < logical_decoding_work_mem * 1024L); } @@ -3344,10 +3480,6 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) */ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); - - Assert(dlist_is_empty(&txn->changes)); - Assert(txn->nentries == 0); - Assert(txn->nentries_mem == 0); } /* diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 95d18cdb12..aa17f7df84 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -67,6 +67,7 @@ #define XLH_INSERT_LAST_IN_MULTI (1<<1) #define XLH_INSERT_IS_SPECULATIVE (1<<2) #define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3) +#define XLH_INSERT_ON_TOAST_RELATION (1<<4) /* * xl_heap_update flag values, 8 bits are available. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b3e2b3f64b..a9b1aacdb1 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -172,6 +172,8 @@ typedef struct ReorderBufferChange #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 #define RBTXN_IS_STREAMED 0x0008 +#define RBTXN_HAS_TOAST_INSERT 0x0010 +#define RBTXN_HAS_SPEC_INSERT 0x0020 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -191,6 +193,26 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* This transaction's changes has toast insert, without main table insert. */ +#define rbtxn_has_toast_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \ +) +/* + * This transaction's changes has speculative insert, without speculative + * confirm. + */ +#define rbtxn_has_spec_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \ +) + +/* Check whether this transaction has an incomplete change. */ +#define rbtxn_has_incomplete_tuple(txn) \ +( \ + rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \ +) + /* * Has this transaction been streamed to downstream? * @@ -199,10 +221,6 @@ typedef struct ReorderBufferChange * which case we'd have nentries==0 for the toplevel one, which would say * nothing about the streaming. So we maintain this flag, but only for the * toplevel transaction.) - * - * Note: We never do both stream and serialize a transaction (we only spill - * to disk when streaming is not supported by the plugin), so only one of - * those two flags may be set at any given time. */ #define rbtxn_is_streamed(txn) \ ( \ @@ -350,6 +368,15 @@ typedef struct ReorderBufferTXN * Size of this transaction (changes currently in memory, in bytes). */ Size size; + + /* Size of top-transaction including sub-transactions. */ + Size total_size; + + /* Size of the commplete changes. */ + Size complete_size; + + /* LSN of the last complete change. */ + XLogRecPtr last_complete_lsn; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -537,7 +564,9 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); -void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, + XLogRecPtr lsn, ReorderBufferChange *, + bool incomplte_data); void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); -- 2.23.0