Hi,
we would like to fetch changes from the wal using a combination of the cursor and the sql function pg_logical_slot_get_changes that also takes care about the position of the LSN pointer.
The decision to use a cursor in the Python script was to mitigate the out-of-memory exceptions in case of large number of changes.
What we have observed is that when an exception occurs during one of the cursor iterations the function still advances the LSN pointer. Same thing can also be reproduced in plpgsql. Is this an expected behaviour and is there maybe a way how we can prevent this?
Interestingly when we execute the same query without the cursor in an IDE and manually interrupt the statement the pointer does not advance.
Thanks!
Mertoz
----------
Version:
- plugin - wal2json v2
- PostgreSQL 14.7 on x86_64-pc-linux-gnu, compiled by Debian clang version 12.0.1, 64-bit
Scenarios:
-- create the slot
SELECT pg_create_logical_replication_slot('test_slot_bq_failures', 'wal2json');
-- prepare table
create table contact9 (id int, name varchar, email varchar, col01 varchar, col02 varchar, updated_at timestamp);
alter table contact9 replica identity full;
-- check the number of records without affecting the LSN pointer
select count(*) from pg_logical_slot_peek_changes('test_slot_bq_failures', NULL, NULL, 'include-xids', '1', 'include-schemas','1', 'include-timestamp', '1','format-version','2','include-transaction','0','pretty-print','0','include-lsn','1');
-- insert dummy data before each scenario
insert into contact9
select
id,
'User ' || id name,
'user' || id || '@
example.com' email,
repeat(gen_random_uuid()::text, 10) col01, -- 36 x 10 = 360 chars
repeat(gen_random_uuid()::text, 10) col02,
current_timestamp updated_at
from (select generate_series(1, 1000000) id ) a; -- large, test for manual interrupt
-- from (select generate_series(1, 100) id ) a;
-- SCENARIO 1
-- results:
-- 1st run + manual interrupt: none (ERROR: canceling statement due to user request) -> OK
-- 2nd run: 1 000 000 -> OK
-- 3r run: 0 -> OK (LSN advanced in the previous step because of the successful execution of the statement)
select count(*) from pg_logical_slot_get_changes('test_slot_bq_failures', NULL, NULL, 'include-xids', '1', 'include-schemas','1', 'include-timestamp', '1','format-version','2','include-transaction','0','pretty-print','0','include-lsn','1');
-- SCENARIO 2 (cursor, pgsql)
-- results:
-- 1st run: 1 row + ERROR: Simulated error -> OK
-- 2nd run: 0 rows -> NOT OK as the first execution was not successful so we should fetch again 1 row and then an exception should trigger
DO $$
DECLARE
cur CURSOR FOR SELECT 'contact9' target_table_name, lsn, xid, data FROM pg_logical_slot_get_changes('test_slot_bq_failures', NULL, NULL, 'include-xids', '1', 'include-schemas', '1', 'include-timestamp', '1', 'format-version', '2', 'include-transaction', '0', 'pretty-print', '0', 'include-lsn', '1');
rec RECORD;
BEGIN
OPEN cur;
LOOP
FETCH cur INTO rec;
EXIT WHEN NOT FOUND;
RAISE NOTICE '%', rec; -- debug the record on the output
RAISE EXCEPTION 'Simulated error'; -- simulate exception after the first retrieved row -> LSN should stay the same and on the next run we should again see the first row?
END LOOP;
CLOSE cur;
END $$;