From 5d5116aac9db01242e36bc629fa19af816c47e23 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 11 Jun 2021 10:08:42 +0530 Subject: [PATCH v7] 96-Fix decoding of speculative aborts. During decoding for speculative inserts, we were relying for cleaning toast hash on confirmation records or next change records. But that could lead to multiple problems (a) memory leak if there is neither a confirmation record nor any other record after toast insertion for a speculative insert in the transaction, (b) error and assertion failures if the next operation is not an insert/update on the same table. The fix is to start queuing spec abort change and clean up toast hash and change record during its processing. Currently, we are queuing the spec aborts for both toast and main table even though we perform cleanup while processing the main table's spec abort record. Later, if we have a way to distinguish between the spec abort record of toast and the main table, we can avoid queuing the change for spec aborts of toast tables. Reported-by: Ashutosh Bapat Author: Dilip Kumar Reviewed-by: Amit Kapila Backpatch-through: 9.6, where it was introduced Discussion: https://postgr.es/m/CAExHW5sPKF-Oovx_qZe4p5oM6Dvof7_P+XgsNAViug15Fm99jA@mail.gmail.com --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/speculative_abort.out | 85 ++++++++++++++++ contrib/test_decoding/specs/speculative_abort.spec | 111 +++++++++++++++++++++ src/backend/replication/logical/decode.c | 14 ++- src/backend/replication/logical/reorderbuffer.c | 48 ++++++--- src/include/replication/reorderbuffer.h | 11 +- 6 files changed, 245 insertions(+), 26 deletions(-) create mode 100644 contrib/test_decoding/expected/speculative_abort.out create mode 100644 contrib/test_decoding/specs/speculative_abort.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index b6fc8da..18dcd2d 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -54,7 +54,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(REGRESSCHECKS) ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top + oldest_xmin snapshot_transfer subxact_without_top speculative_abort isolationcheck: | submake-isolation submake-test_decoding temp-install $(MKDIR_P) isolation_output diff --git a/contrib/test_decoding/expected/speculative_abort.out b/contrib/test_decoding/expected/speculative_abort.out new file mode 100644 index 0000000..7492506 --- /dev/null +++ b/contrib/test_decoding/expected/speculative_abort.out @@ -0,0 +1,85 @@ +Parsed test spec with 3 sessions + +starting permutation: controller_locks controller_show_count s1_begin s1_insert_toast s2_insert_toast controller_show_count controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show_count controller_unlock_2_2 controller_show_count controller_unlock_1_2 s1_insert_other s1_commit controller_get_changes +data + +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step s1_begin: BEGIN; +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 3 +step s1_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 3 +step s2_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 2 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +step s2_insert_toast: <... completed> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +1 +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step s1_insert_toast: <... completed> +step s1_insert_other: INSERT INTO tbl2 VALUES(1); +step s1_commit: COMMIT; +step controller_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.tbl1: INSERT: a[integer]:1 b[text]:'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +COMMIT +BEGIN +table public.tbl2: INSERT: a[integer]:1 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/speculative_abort.spec b/contrib/test_decoding/specs/speculative_abort.spec new file mode 100644 index 0000000..6b3cbfb --- /dev/null +++ b/contrib/test_decoding/specs/speculative_abort.spec @@ -0,0 +1,111 @@ +# INSERT ... ON CONFLICT test verifying that speculative abort for toast +# insertions are handled during logical decoding. +# +# Does this by using advisory locks controlling the progress of +# insertions. By waiting when building the index keys, it's possible +# to schedule concurrent INSERT ON CONFLICTs so that there will always +# be a speculative conflict. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (a INT, b TEXT); + ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL; + CREATE TABLE tbl2 (a INT); + + CREATE OR REPLACE FUNCTION blurt_and_lock(int) RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ + BEGIN + RAISE NOTICE 'blurt_and_lock() called for % in session %', $1, current_setting('spec.session')::int; + + -- depending on lock state, wait for lock 2 or 3 + IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN + RAISE NOTICE 'acquiring advisory lock on 2'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2); + ELSE + RAISE NOTICE 'acquiring advisory lock on 3'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3); + END IF; + RETURN $1; + END;$$; + + CREATE UNIQUE INDEX idx on tbl1(blurt_and_lock(a)); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "controller" +setup +{ + SET default_transaction_isolation = 'read committed'; + SET application_name = 'isolation/insert-specconflict-controller'; +} +step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);} +step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); } +step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); } +step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); } +step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); } +step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); } +step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); } +step "controller_show_count" { SELECT COUNT(*) FROM tbl1; } +step "controller_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 1; + SET application_name = 'isolation/insert-specconflict-s1'; +} + +step "s1_begin" { BEGIN; } +step "s1_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } +step "s1_insert_other" { INSERT INTO tbl2 VALUES(1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 2; + SET application_name = 'isolation/insert-specconflict-s2'; +} +step "s2_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } + + +# Test logical decoding of speculative aborts for toast insertion followed by +# insertion into a different table which doesn't have a toast. +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show_count" + "s1_begin" + "s1_insert_toast" "s2_insert_toast" + "controller_show_count" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show_count" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful insertion + "controller_show_count" + # Allow the first session to speculative abort + "controller_unlock_1_2" + # Insert into other table from s1 and commit + "s1_insert_other" "s1_commit" + # Get the changes + "controller_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1300902..571a901 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -778,19 +778,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index f0de337..1cd0bbd 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -364,6 +364,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + /* check whether to put into the slab cache */ if (rb->nr_cached_transactions < max_cached_transactions) { @@ -449,6 +452,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) break; /* no data in addition to the struct itself */ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -1674,8 +1678,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -1717,6 +1721,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. + * + * Note that we get the spec abort change for each toast + * entry but we need to perform the cleanup only the first + * time we get it for the main table. + */ + if (specinsert != NULL) + { + /* + * We must clean the toast hash before processing a + * completely new tuple to avoid confusion about the + * previous tuple's toast chunks. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* We don't need this record anymore. */ + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_MESSAGE: rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, @@ -1792,16 +1822,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } } - /* - * There's a speculative insertion remaining, just clean in up, it - * can't have been successful, otherwise we'd gotten a confirmation - * record. - */ - if (specinsert) - { - ReorderBufferReturnChange(rb, specinsert); - specinsert = NULL; - } + /* speculative insertion record must be freed by now */ + Assert(!specinsert); /* clean up the iterator */ ReorderBufferIterTXNFinish(rb, iterstate); @@ -2476,6 +2498,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -2764,6 +2787,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* the base struct contains all the data, easy peasy */ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e085088..ddba4bd 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -44,10 +44,10 @@ typedef struct ReorderBufferTupleBuf * changes. Users of the decoding facilities will never see changes with * *_INTERNAL_* actions. * - * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern - * "speculative insertions", and their confirmation respectively. They're - * used by INSERT .. ON CONFLICT .. UPDATE. Users of logical decoding don't - * have to care about these. + * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT + * changes concern "speculative insertions", their confirmation, and abort + * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of + * logical decoding don't have to care about these. */ enum ReorderBufferChangeType { @@ -59,7 +59,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, - REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT }; /* -- 1.8.3.1