Re: BUG #13725: Logical Decoding - wrong results with large transactions and unfortunate timing - Mailing list pgsql-bugs
From | |
---|---|
Subject | Re: BUG #13725: Logical Decoding - wrong results with large transactions and unfortunate timing |
Date | |
Msg-id | CAPL_MpMjuQbXZ5xaP8GEmaXpOwbKHR1viUrP1ZQrXao4JghdKA@mail.gmail.com Whole thread Raw |
In response to | Re: BUG #13725: Logical Decoding - wrong results with large transactions and unfortunate timing (Andres Freund <andres@anarazel.de>) |
List | pgsql-bugs |
Yes, this is a small script to reproduce, the real code is Java, we saw sporadic wrong results. However, I'm interested in CDC (get change notifications per row to my app), not PG-to-PG replication. Hope it makes sense, Ofir On Mon, Oct 26, 2015 at 1:05 PM, Andres Freund <andres@anarazel.de> wrote: > On 2015-10-26 10:43:08 +0100, Shulgin, Oleksandr wrote: > > > Test case, script 1. > > > I clean up the previous test, create a new table and two replication > slot. > > > Then, I run a script in the background (it will trigger the bug), > insert > > > 100K rows, try to get them with pg_logical_slot_get_changes from the > first > > > slot. > > > This works correctly every time - I get back the 100,000 rows + one > BEGIN + > > > one COMMIT. > > > psql --quiet << EOF > > > SELECT pg_drop_replication_slot('test1_slot'); > > > SELECT pg_drop_replication_slot('test2_slot'); > > > DROP TABLE test; > > > CREATE TABLE test (id int primary key, v varchar); > > > SELECT * FROM pg_create_logical_replication_slot('test1_slot', > > > 'test_decoding'); > > > SELECT * FROM pg_create_logical_replication_slot('test2_slot', > > > 'test_decoding'); > > > EOF > > > ./get_loop.sh & > > > psql --quiet --tuples-only -o out1 << EOF > > > INSERT INTO test SELECT i, (i*3)::varchar FROM > generate_series(1,100000) > > > i; > > > SELECT * FROM pg_logical_slot_get_changes('test1_slot', NULL, NULL); > > > EOF > > > cat --squeeze-blank out1 | wc -l > > > > > > Script 2 (get_loop.sh) continuously polls for changes using a loop of > > > pg_logical_slot_get_changes, using the second replication slot. Since > the > > > test pg database is idle, only one call returns. > > If this is anything more than a reproducer: don't do that, use the > streaming protocol. > > > > for i in `seq 1 10000`; do > > > echo "SELECT * FROM pg_logical_slot_get_changes('test2_slot', NULL, > > > NULL);" > > > done | psql --quiet --tuples-only -o out2 > > > cat --squeeze-blank out2 | wc -l > > > > > > However, I get 116K or 148K rows in the output (after the 100K > inserts). > > > I can clearly see the the output jumps back after 16K or 48K rows - > > > starting > > > to stream the transaction from the beginning (see the first column - > the > > > offsets): > > > ... > > > 1/68F6E1B8 | 450854 | table public.test: INSERT: id[integer]:49149 > > > v[character varying]:'147447' > > > 1/68F6E248 | 450854 | table public.test: INSERT: id[integer]:49150 > > > v[character varying]:'147450' > > > 1/68F6E2D8 | 450854 | table public.test: INSERT: id[integer]:49151 > > > v[character varying]:'147453' > > > 1/68F6E368 | 450854 | table public.test: INSERT: id[integer]:49152 > > > v[character varying]:'147456' > > > 1/68891010 | 450854 | table public.test: INSERT: id[integer]:1 > v[character > > > varying]:'3' > > > 1/688910D8 | 450854 | table public.test: INSERT: id[integer]:2 > v[character > > > varying]:'6' > > > 1/68891168 | 450854 | table public.test: INSERT: id[integer]:3 > v[character > > > varying]:'9' > > > 1/688911F8 | 450854 | table public.test: INSERT: id[integer]:4 > v[character > > > varying]:'12' > > > ... > > > > > > When I have larger transaction (inserting 1M or 10M rows, it gets > worse, > > > jumping back to the beginning multiple times. > > > > > > BTW - I don't know if it is a bug in pg_logical_slot_get_changes or in > the > > > test_decoding output plugin that I'm using to evaluate logical > decoding. > > > > > > Any fix / workaround? > > > > > > > I didn't try it, but looks like this part of code in logicalfuncs.c is to > > blame: > > > > static Datum > > pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, > > bool binary) > > { > > [snip] > > > > /* compute the current end-of-wal */ > > if (!RecoveryInProgress()) > > end_of_wal = GetFlushRecPtr(); > > else > > end_of_wal = GetXLogReplayRecPtr(NULL); > > > > [snip] > > while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || > > (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal)) > > { > > > > So we compute end_of_wal before entering the loop, but the WAL keeps > > growing as we read through it. > > So? > > > If we do it correctly, there's potential that the loop will never finish > if > > the WAL grows faster than we can decode it. > > > Shouldn't we also try to re-write this SRF to support > > SFRM_ValuePerCall? > > Why? ValuePercall gets materialized into a tuplestore as well, unless > you call it from the select list. > > > Another (quite minor) problem I see is this: > > > > --- a/src/backend/replication/logical/logicalfuncs.c > > +++ b/src/backend/replication/logical/logicalfuncs.c > > @@ -301,7 +301,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo > > fcinfo, bool confirm, bool bin > > upto_lsn = PG_GETARG_LSN(1); > > > > if (PG_ARGISNULL(2)) > > - upto_nchanges = InvalidXLogRecPtr; > > + upto_nchanges = 0; > > else > > upto_nchanges = PG_GETARG_INT32(2); > > Ugh, yes. > > Greetings, > > Andres Freund > -- Ofir Manor Blog: http://ofirm.wordpress.com <http://ofirm.wordpress.com> LinkedIn: http://il.linkedin.com/in/ofirmanor Twitter: @ofirm Mobile: +972-54-7801286
pgsql-bugs by date: