Thread: Logical replication (pgoutput plugin) in streaming mode: peek() always starts from beginning of transaction, not from latest stream block

Hi everyone,

When using logical replication with the pgoutput plugin, on PG 16,we do the following:
  1) SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'false')
  2) Get LSN of last row (Commit)
  3) SELECT * FROM pg_replication_slot_advance('test_slot_v1', <Commit LSN>);  
  4) Repeat.

And this works perfectly fine when streaming = false. When turning on streaming the expectation is that the same thing happens, except the the LSN being passed to pg_replication_slot_advance() is for a Stream End record. On the next call to pg_logical_slot_peek_binary_changes() we should get the subsequent Stream Start record. But instead, the stream starts over from the transaction Begin record. Observe: 

*** Demo starts ***
*** Initially there are no changes, peek() returns nothing: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
 lsn | xid | data
-----+-----+------
(0 rows)

*** Slot status: ***

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;                                                               slot_name   | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
 test_slot_v1 | f      | 2/98CE060   | 2/98CE060
(1 rows)


*** Now make some changes (delete then insert a bunch of records) and call peek()          ***
*** The predicate filters out Delete and Insert records, leaving Stream Start (\x53 = S),  ***
*** Relation (\x52 = R), Stream End (\x45 = E), and Stream Commit (\x63 = c)               ***

abinitio=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
    lsn     | xid  |                                                                 data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 2/A222A20  | 1112 | \x530000045801
 2/A222A20  | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/C141BE8  | 1112 | \x45
 2/C141C28  | 1112 | \x530000045800
 2/DF598D8  | 1112 | \x45
 2/DF59950  | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
 2/DF59950  | 1114 | \x530000045a01
 2/DF59950  | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/108918D0 | 1114 | \x45
 2/108918D0 | 1114 | \x530000045a00
 2/131E1310 | 1114 | \x45
 2/131E1310 | 1114 | \x530000045a00
 2/137D7768 | 1114 | \x45
 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)

*** It was a peek() so the status is unchanged: ***

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;                                                               slot_name   | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
 test_slot_v1 | f      | 2/98CE060   | 2/98CE060
(1 rows)

*** Now advance the slot to the first Stream End record: ***

=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/C141BE8');                                                                             slot_name   |  end_lsn
--------------+-----------
 test_slot_v1 | 2/C141BE8
(1 row)

*** confirmed_flush_lsn is updated as expected: ****

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
  slot_name   | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
 test_slot_v1 | f      | 2/9B09D10   | 2/C141BE8
(1 rows)

*** Now peek() again. It is starting from earlier than confirmed_flush_lsn: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
    lsn     | xid  |                                                                 data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 2/A222A20  | 1112 | \x530000045801
 2/A222A20  | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/C141BE8  | 1112 | \x45
 2/C141C28  | 1112 | \x530000045800
 2/DF598D8  | 1112 | \x45
 2/DF59950  | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
 2/DF59950  | 1114 | \x530000045a01
 2/DF59950  | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/108918D0 | 1114 | \x45
 2/108918D0 | 1114 | \x530000045a00
 2/131E1310 | 1114 | \x45
 2/131E1310 | 1114 | \x530000045a00
 2/137D7768 | 1114 | \x45
 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)

*** Next advance to the Stream Commit record: ***

=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/DF59950');                                                                             slot_name   |  end_lsn
--------------+-----------
 test_slot_v1 | 2/DF59950
(1 row)

*** This time the peek() starts from the correct LSN: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
    lsn     | xid  |                                                                 data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 2/DF59950  | 1114 | \x530000045a01
 2/DF59950  | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/108918D0 | 1114 | \x45
 2/108918D0 | 1114 | \x530000045a00
 2/131E1310 | 1114 | \x45
 2/131E1310 | 1114 | \x530000045a00
 2/137D7768 | 1114 | \x45
 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(8 rows)

*** End of demo ***

The question is whether that is by design or a bug, and if by design maybe someone can explain how this is meant to be used, because it's not clear. It will work eventually if argument upto_nchanges  is NULL, because when the transaction completes we get a Stream Commit record and can advance, but in the meantime we'll have ingested a lot of duplicate records we now have to deal with. And if  argument upto_nchanges is not NULL we're stuck because peek() will only returns one or more Stream blocks until the number of returned rows exceeds upto_nchanges , and then returns the same blocks over and over again forever because we cannot advance, and never see the Stream Commit record.

Thank you.

Guillaume.