RE: logical replication empty transactions - Mailing list pgsql-hackers
From | shiy.fnst@fujitsu.com |
---|---|
Subject | RE: logical replication empty transactions |
Date | |
Msg-id | OSZPR01MB631042EED2FF7A6F6079C54EFD039@OSZPR01MB6310.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: logical replication empty transactions (Ajin Cherian <itsajin@gmail.com>) |
Responses |
Re: logical replication empty transactions
Re: logical replication empty transactions |
List | pgsql-hackers |
Hi, Here are some comments on the v21 patch. 1. + WalSndKeepalive(false, 0); Maybe we can use InvalidXLogRecPtr here, instead of 0. 2. + pq_sendint64(&output_message, writePtr ? writePtr : sentPtr); Similarly, should we use XLogRecPtrIsInvalid()? 3. @@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + if (in_streaming) + { + /* If streaming, send STREAM START if we haven't yet */ + if (txndata && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + } + else + { + /* If not streaming, send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } + + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); I am not sure if it is suitable to send begin or stream_start here, because the row filter is not checked yet. That means, empty transactions caused by row filter are not skipped. 4. @@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *txndata = txn->output_plugin_private; + bool sent_begin_txn = txndata->sent_begin_txn; + Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + pfree(txndata); + txn->output_plugin_private = NULL; + + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare"); + return; + } + + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); I notice that the patch skips stream prepared transaction, this would cause an error on subscriber side when committing this transaction on publisher side, so I think we'd better not do that. For example: (set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in postgresql.conf) -- publisher create table test (a int, b text, primary key(a)); create table test2 (a int, b text, primary key(a)); create publication pub for table test; -- subscriber create table test (a int, b text, primary key(a)); create table test2 (a int, b text, primary key(a)); create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on); -- publisher begin; INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i); prepare transaction 't'; commit prepared 't'; The error message in subscriber log: ERROR: prepared transaction with identifier "pg_gid_16391_722" does not exist Regards, Shi yu
pgsql-hackers by date: