RE: logical replication empty transactions - Mailing list pgsql-hackers

From shiy.fnst@fujitsu.com
Subject RE: logical replication empty transactions
Date
Msg-id OSZPR01MB631042EED2FF7A6F6079C54EFD039@OSZPR01MB6310.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: logical replication empty transactions  (Ajin Cherian <itsajin@gmail.com>)
Responses Re: logical replication empty transactions  (Ajin Cherian <itsajin@gmail.com>)
Re: logical replication empty transactions  (Ajin Cherian <itsajin@gmail.com>)
List pgsql-hackers
Hi,

Here are some comments on the v21 patch.

1.
+            WalSndKeepalive(false, 0);

Maybe we can use InvalidXLogRecPtr here, instead of 0.

2.
+    pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);

Similarly, should we use XLogRecPtrIsInvalid()?

3.
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
             Assert(false);
     }
 
+   if (in_streaming)
+    {
+        /* If streaming, send STREAM START if we haven't yet */
+        if (txndata && !txndata->sent_stream_start)
+        pgoutput_send_stream_start(ctx, txn);
+    }
+    else
+    {
+        /* If not streaming, send BEGIN if we haven't yet */
+        if (txndata && !txndata->sent_begin_txn)
+        pgoutput_send_begin(ctx, txn);
+    }
+
+
     /* Avoid leaking memory by using and resetting our own context */
     old = MemoryContextSwitchTo(data->context);


I am not sure if it is suitable to send begin or stream_start here, because the
row filter is not checked yet. That means, empty transactions caused by row
filter are not skipped.

4.
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
                             ReorderBufferTXN *txn,
                             XLogRecPtr prepare_lsn)
 {
+    PGOutputTxnData *txndata = txn->output_plugin_private;
+    bool            sent_begin_txn = txndata->sent_begin_txn;
+
     Assert(rbtxn_is_streamed(txn));
 
-    OutputPluginUpdateProgress(ctx);
+    pfree(txndata);
+    txn->output_plugin_private = NULL;
+
+    if (!sent_begin_txn)
+    {
+        elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+        return;
+    }
+
+    OutputPluginUpdateProgress(ctx, false);
     OutputPluginPrepareWrite(ctx, true);
     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
     OutputPluginWrite(ctx, true);

I notice that the patch skips stream prepared transaction, this would cause an
error on subscriber side when committing this transaction on publisher side, so
I think we'd better not do that.

For example:
(set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
postgresql.conf)

-- publisher
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create publication pub for table test;

-- subscriber 
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on);

-- publisher
begin;
INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
prepare transaction 't';
commit prepared 't';

The error message in subscriber log:
ERROR:  prepared transaction with identifier "pg_gid_16391_722" does not exist


Regards,
Shi yu

pgsql-hackers by date:

Previous
From: "osumi.takamichi@fujitsu.com"
Date:
Subject: RE: logical replication restrictions
Next
From: Tom Lane
Date:
Subject: Re: Condition pushdown: why (=) is pushed down into join, but BETWEEN or >= is not?