From c70b5d93f5018e4c0920feefb8615a37a83a147e Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 30 Jan 2020 14:21:04 +0530 Subject: [PATCH v9 12/12] Bugfix handling of incomplete toast tuple --- src/backend/access/heap/heapam.c | 3 + src/backend/replication/logical/decode.c | 17 ++- src/backend/replication/logical/reorderbuffer.c | 145 +++++++++++++----------- src/include/access/heapam_xlog.h | 1 + src/include/replication/reorderbuffer.h | 17 ++- 5 files changed, 110 insertions(+), 73 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 24d0d7a..faaaf67 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2018,6 +2018,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, { xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; + + if (IsToastRelation(relation)) + xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; } XLogBeginInsert(); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 13a11ac..4ee528f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -734,7 +734,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); } /* @@ -801,7 +803,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -858,7 +861,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -894,7 +898,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) memcpy(change->data.truncate.relids, xlrec->relids, xlrec->nrelids * sizeof(Oid)); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); } /* @@ -999,7 +1003,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = false; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); /* move to the next xl_multi_insert_tuple entry */ data += datalen; @@ -1037,7 +1041,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2004d6a..524a66e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -650,7 +650,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, */ void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, - ReorderBufferChange *change) + ReorderBufferChange *change, bool toast_insert) { ReorderBufferTXN *txn; @@ -664,6 +664,28 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn->nentries++; txn->nentries_mem++; + /* + * If this is a toast insert then set the corresponding bit. Otherwise, if + * we have toast insert bit set and this is insert/update then clear the + * bit. + */ + if (toast_insert) + txn->txn_flags |= RBTXN_HAS_TOAST_INSERT; + else if (rbtxn_has_toast_insert(txn) && + ((change->action == REORDER_BUFFER_CHANGE_INSERT) || + (change->action == REORDER_BUFFER_CHANGE_UPDATE))) + txn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT; + + /* + * If this is a speculative insert then set the corresponding bit. + * Otherwise, if we have speculative insert bit set and this is spec confirm + * record then clear the bit. + */ + if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) + txn->txn_flags |= RBTXN_HAS_SPEC_INSERT; + else if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) + txn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT; + /* update memory accounting information */ ReorderBufferChangeMemoryUpdate(rb, change, true); @@ -696,7 +718,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, change->data.msg.message = palloc(message_size); memcpy(change->data.msg.message, message, message_size); - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -1870,8 +1892,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * disk. */ dlist_delete(&change->node); - ReorderBufferToastAppendChunk(rb, txn, relation, - change); + ReorderBufferToastAppendChunk(rb, txn, relation, + change); } change_done: @@ -2457,7 +2479,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, change->data.snapshot = snap; change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2506,7 +2528,7 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, change->data.command_id = cid; change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2529,6 +2551,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { Size sz; ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn = NULL; Assert(change->txn); @@ -2544,7 +2567,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* if subxact, and streaming supported, use the toplevel instead */ if (txn->toptxn && ReorderBufferCanStream(rb)) - txn = txn->toptxn; + toptxn = txn->toptxn; sz = ReorderBufferChangeSize(change); @@ -2552,12 +2575,16 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { txn->size += sz; rb->size += sz; + if (toptxn) + toptxn->size += sz; } else { Assert((rb->size >= sz) && (txn->size >= sz)); txn->size -= sz; rb->size -= sz; + if (toptxn) + toptxn->size -= sz; } Assert(txn->size <= rb->size); @@ -2623,7 +2650,7 @@ ReorderBufferAddInvalidation(ReorderBuffer *rb, TransactionId xid, change->data.inval.relcacheInitFileInval = relcacheInitFileInval; change->data.inval.msg = msg; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -2810,15 +2837,16 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) txn = dlist_container(ReorderBufferTXN, node, iter.cur); - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; + /* + * if the current transaction is larger and doesn't have incomplete data + * remember it. + */ + if (((!largest) || (txn->size > largest->size)) && + ((txn->size > 0) && !rbtxn_has_toast_insert(txn) && + !rbtxn_has_spec_insert(txn))) + largest = txn; } - Assert(largest); - Assert(largest->size > 0); - Assert(largest->size <= rb->size); - return largest; } @@ -2836,66 +2864,51 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) - return; - - /* - * Pick the largest transaction (or subtransaction) and evict it from - * memory by streaming, if supported. Otherwise spill to disk. - */ - if (ReorderBufferCanStream(rb)) + /* Loop until we reach under the memory limit. */ + while (rb->size >= logical_decoding_work_mem * 1024L) { /* - * Pick the largest toplevel transaction and evict it from memory by - * streaming the already decoded part. + * Pick the largest transaction (or subtransaction) and evict it from + * memory by streaming, if supported. Otherwise spill to disk. */ - txn = ReorderBufferLargestTopTXN(rb); + if (ReorderBufferCanStream(rb) && + (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + { + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it from + * memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); - /* we know there has to be one, because the size is not zero */ - Assert(txn && !txn->toptxn); - Assert(txn->size > 0); - Assert(rb->size >= txn->size); + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferSerializeTXN(rb, txn); + } - ReorderBufferStreamTXN(rb, txn); - } - else - { /* - * Pick the largest transaction (or subtransaction) and evict it from - * memory by serializing it to disk. + * After eviction, the transaction should have no entries in memory, and + * should use 0 bytes for changes. + * + * XXX Checking the size is fine for both cases - spill to disk and + * streaming. But for streaming we should really check nentries_mem for + * all subtransactions too. */ - txn = ReorderBufferLargestTXN(rb); - - /* we know there has to be one, because the size is not zero */ - Assert(txn); - Assert(txn->size > 0); - Assert(rb->size >= txn->size); - - ReorderBufferSerializeTXN(rb, txn); + Assert(txn->size == 0); + Assert(txn->nentries_mem == 0); } - /* - * After eviction, the transaction should have no entries in memory, and - * should use 0 bytes for changes. - * - * XXX Checking the size is fine for both cases - spill to disk and - * streaming. But for streaming we should really check nentries_mem for - * all subtransactions too. - */ - Assert(txn->size == 0); - Assert(txn->nentries_mem == 0); - - /* - * And furthermore, evicting the transaction should get us below the - * memory limit again - it is not possible that we're still exceeding the - * memory limit after evicting the transaction. - * - * This follows from the simple fact that the selected transaction is at - * least as large as the most recent change (which caused us to go over - * the memory limit). So by evicting it we're definitely back below the - * memory limit. - */ Assert(rb->size < logical_decoding_work_mem * 1024L); } diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 95d18cd..aa17f7d 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -67,6 +67,7 @@ #define XLH_INSERT_LAST_IN_MULTI (1<<1) #define XLH_INSERT_IS_SPECULATIVE (1<<2) #define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3) +#define XLH_INSERT_ON_TOAST_RELATION (1<<4) /* * xl_heap_update flag values, 8 bits are available. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5e1337e..a2646c5 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -174,6 +174,8 @@ typedef struct ReorderBufferChange #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 #define RBTXN_IS_STREAMED 0x0008 +#define RBTXN_HAS_TOAST_INSERT 0x0010 +#define RBTXN_HAS_SPEC_INSERT 0x0020 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -193,6 +195,17 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* This transaction's changes has toast insert, without main table insert. */ +#define rbtxn_has_toast_insert(txn) \ + ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \ + +/* + * This transaction's changes has speculative insert, without speculative + * confirm. + */ +#define rbtxn_has_spec_insert(txn) \ + ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \ + /* * Has this transaction been streamed to downstream? * @@ -547,7 +560,9 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); -void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, + XLogRecPtr lsn, ReorderBufferChange *, + bool incomplte_data); void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); -- 1.8.3.1