From 490495a4cd8ac57753f9977122bbd9887f4c09c1 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 10 Jan 2020 09:01:35 +0530 Subject: [PATCH v5 08/14] Fix speculative insert bug. --- src/backend/replication/logical/reorderbuffer.c | 23 +++++++++++++++++++---- src/include/replication/reorderbuffer.h | 6 ++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c381d8d..eb6fda5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1701,6 +1701,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change; ReorderBufferChange *specinsert = NULL; + /* + * Resotre any previous speculative inserted tuple if we are running in + * streaming mode. + */ + if (streaming && txn->specinsert != NULL) + { + specinsert = txn->specinsert; + txn->specinsert = NULL; + } + if (using_subtxn) BeginInternalSubTransaction("stream"); else @@ -2029,13 +2039,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* - * There's a speculative insertion remaining, just clean in up, it - * can't have been successful, otherwise we'd gotten a confirmation - * record. + * In non-streaming mode if there's a speculative insertion remaining, + * just clean in up, it can't have been successful, otherwise we'd + * gotten a confirmation record. For streaming mode, remember the tuple + * so that if we get the confirmation in the next stream we can stream + * it. */ if (specinsert) { - ReorderBufferReturnChange(rb, specinsert); + if (streaming) + txn->specinsert = specinsert; + else + ReorderBufferReturnChange(rb, specinsert); specinsert = NULL; } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index c4a2643..f41e216 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -335,6 +335,12 @@ typedef struct ReorderBufferTXN uint32 ninvalidations; SharedInvalidationMessage *invalidations; + /* + * Speculative insert saved from the last streamed run if the speculative + * confirm has not received in the same stream. + */ + ReorderBufferChange *specinsert; + /* --- * Position in one of three lists: * * list of subtransactions if we are *known* to be subxact -- 1.8.3.1