From 3f996c0bb9f8166990a4289cdc512c91cedd9a78 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 23 Nov 2020 14:55:04 +0530 Subject: [PATCH v3] Fix replication of in-progress transactions in tablesync worker. Tablesync worker runs under a single transaction but in streaming mode, we were committing the transaction on stream_stop, stream_abort, and stream_commit. We need to avoid committing the transaction in a streaming mode in tablesync worker. Author: Dilip Kumar Reviewed-by: Amit Kapila Tested-by: Peter Smith Discussion: https://postgr.es/m/CAHut+Pt4PyKQCwqzQ=EFF=bpKKJD7XKt_S23F6L20ayQNxg77A@mail.gmail.com --- src/backend/replication/logical/worker.c | 97 +++++++++++++----------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 04684912de..49ad577892 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -224,6 +224,8 @@ static void maybe_reread_subscription(void); /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); +static void apply_handle_commit_internal(StringInfo s, + LogicalRepCommitData* commit_data); static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot); static void apply_handle_update_internal(ResultRelInfo *relinfo, @@ -709,32 +711,7 @@ apply_handle_commit(StringInfo s) Assert(commit_data.commit_lsn == remote_final_lsn); - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) - { - /* - * Update origin state so we can restart streaming from correct - * position in case of crash. - */ - replorigin_session_origin_lsn = commit_data.end_lsn; - replorigin_session_origin_timestamp = commit_data.committime; - - CommitTransactionCommand(); - pgstat_report_stat(false); - - store_flush_position(commit_data.end_lsn); - } - else - { - /* Process any invalidation messages that might have accumulated. */ - AcceptInvalidationMessages(); - maybe_reread_subscription(); - } - - in_remote_transaction = false; - - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + apply_handle_commit_internal(s, &commit_data); pgstat_report_activity(STATE_IDLE, NULL); } @@ -825,8 +802,12 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* Commit the per-stream transaction */ - CommitTransactionCommand(); + /* The synchronization worker runs in single transaction. */ + if (!am_tablesync_worker()) + { + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + } in_streamed_transaction = false; @@ -902,7 +883,10 @@ apply_handle_stream_abort(StringInfo s) { /* Cleanup the subxact info */ cleanup_subxact_info(); - CommitTransactionCommand(); + + /* The synchronization worker runs in single transaction */ + if (!am_tablesync_worker()) + CommitTransactionCommand(); return; } @@ -928,7 +912,9 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - CommitTransactionCommand(); + + if (!am_tablesync_worker()) + CommitTransactionCommand(); } } @@ -1048,28 +1034,13 @@ apply_handle_stream_commit(StringInfo s) BufFileClose(fd); - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = commit_data.end_lsn; - replorigin_session_origin_timestamp = commit_data.committime; - pfree(buffer); pfree(s2.data); - CommitTransactionCommand(); - pgstat_report_stat(false); - - store_flush_position(commit_data.end_lsn); - elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); - in_remote_transaction = false; - - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + apply_handle_commit_internal(s, &commit_data); /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); @@ -1077,6 +1048,40 @@ apply_handle_stream_commit(StringInfo s) pgstat_report_activity(STATE_IDLE, NULL); } +/* + * Helper function for apply_handle_commit and apply_handle_stream_commit. + */ +static void +apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data) +{ + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data->end_lsn); +} + /* * Handle RELATION message. * -- 2.28.0.windows.1