On Mon, Nov 2, 2020 at 4:11 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
Few Comments on v15-0003-Support-2PC-txn-pgoutput
===============================================
1. This patch needs to be rebased after commit 644f0d7cc9 and requires
some adjustments accordingly.
2.
if (flags != 0)
elog(ERROR, "unrecognized flags %u in commit message", flags);
+
/* read fields */
commit_data->commit_lsn = pq_getmsgint64(in);
Spurious line.
3.
@@ -720,6 +722,7 @@ apply_handle_commit(StringInfo s)
replorigin_session_origin_timestamp = commit_data.committime;
CommitTransactionCommand();
+
pgstat_report_stat(false);
Spurious line
4.
+static void
+apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data)
+{
+ Assert(prepare_data->prepare_lsn == remote_final_lsn);
+
+ /* The synchronization worker runs in single transaction. */
+ if (IsTransactionState() && !am_tablesync_worker())
+ {
+ /* End the earlier transaction and start a new one */
+ BeginTransactionBlock();
+ CommitTransactionCommand();
+ StartTransactionCommand();
There is no explanation as to why you want to end the previous
transaction and start a new one. Even if we have to do so, we first
need to call BeginTransactionBlock before CommitTransactionCommand.
5.
- * Handle STREAM COMMIT message.
+ * Common spoolfile processing.
+ * Returns how many changes were applied.
*/
-static void
-apply_handle_stream_commit(StringInfo s)
+static int
+apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
{
- TransactionId xid;
Can we have a separate patch for this as this can be committed before
main patch. This is a refactoring required for the main patch.
6.
@@ -57,7 +63,8 @@ static void pgoutput_stream_abort(struct
LogicalDecodingContext *ctx,
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
-
+static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
Spurious line removal.
--
With Regards,
Amit Kapila.