Thread: Multiple message delivery on logical replication

Multiple message delivery on logical replication

From
Christophe Pettus
Date:
I'm working with the logical replication support in psycopg2, and have found something surprising... this may be my
error,of course! 

My sample program is below.  It works wonderfully, but in the case when it starts, it re-receives the last message that
ithandled, even with flushing it. 

Example:

postgres@localhost:~/wal2pubsub$ python waltest.py

{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}
^C
postgres@localhost:~/wal2pubsub$ python waltest.py

{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}

There was no database activity in that period; it just replayed the same message.  Shouldn't it have flushed to the end
ofthe WAL stream and not reprocessed the last message? 

--

import psycopg2
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL

conn = psycopg2.connect('dbname=postgres', connection_factory=LogicalReplicationConnection)
cur = conn.cursor()

cur.start_replication(slot_name='test_slot', slot_type=REPLICATION_LOGICAL)

from select import select
from datetime import datetime

def consume(msg):
    print(msg.payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)

try:
    cur.consume_stream(consume)
except:
    pass

--
-- Christophe Pettus
   xof@thebuild.com



Re: Multiple message delivery on logical replication

From
"Jonathan S. Katz"
Date:
> On Nov 2, 2018, at 9:28 PM, Christophe Pettus <xof@thebuild.com> wrote:
>
> I'm working with the logical replication support in psycopg2, and have found something surprising... this may be my
error,of course! 
>
> My sample program is below.  It works wonderfully, but in the case when it starts, it re-receives the last message
thatit handled, even with flushing it. 

I’ve had this happen as well. IIRC it was when I was using it in async
mode with wal2json. From the investigating I did at the time, I thought
it was a bug with wal2json instead of psycopg2, but it was awhile ago
and my memory is fuzzy :(

Jonathan


Attachment

Re: Multiple message delivery on logical replication

From
Christophe Pettus
Date:

> On Nov 2, 2018, at 18:31, Jonathan S. Katz <jonathan.katz@excoventures.com> wrote:
> I thought it was a bug with wal2json instead of psycopg2 [...]

That appears to be the case; test_decoding doesn't show the same behavior.

--
-- Christophe Pettus
  xof@thebuild.com