From dc6d97c71394c7c216920b9aa1d55bf33c5ac472 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 24 Feb 2022 16:56:58 +0900 Subject: [PATCH 2/2] Add the origin name and remote commit-LSN to logical replication worker errcontext. This commits adds both the commit-LSN and replication origin name to the existing error context message. This will help users in specifying the origin name and commit-LSN to pg_replication_origin_advance() SQL function to skip the particular transaction. --- doc/src/sgml/logical-replication.sgml | 19 +++++-- src/backend/replication/logical/worker.c | 71 ++++++++++++++++++------ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 96b4886e08..a96cc21a1c 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -354,12 +354,21 @@ The resolution can be done either by changing data or permissions on the subscriber so that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the + transaction that conflicts with the existing data. When a conflict produces + an error, it is shown in the subscriber's server logs as follows: + +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (c)=(1) already exists. +CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+00 from replication origin "pg_16395" + + The LSN of the transaction that contains the change violating the constraint and + the replication origin name can be found from those outputs (LSN 0/14C0378 and + replication origin pg_16395 in the above case). The transaction + can be skipped by calling the pg_replication_origin_advance() function with - a node_name corresponding to the subscription name, - and a position. The current position of origins can be seen in the - + the node_name and the next LSN of the commit LSN + (i.e., 0/14C0379) from those outputs. The current position of origins can be + seen in the pg_replication_origin_status system view. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ac49e73b45..3fe5f50806 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg /* Remote node information */ int remote_attnum; /* -1 if invalid */ TransactionId remote_xid; + XLogRecPtr commit_lsn; + char *origin_name; TimestampTz ts; /* commit, rollback, or prepare timestamp */ } ApplyErrorCallbackArg; @@ -235,6 +237,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .rel = NULL, .remote_attnum = -1, .remote_xid = InvalidTransactionId, + .commit_lsn = InvalidXLogRecPtr, + .origin_name = NULL, .ts = 0, }; @@ -334,7 +338,8 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* Functions for apply error callback */ static void apply_error_callback(void *arg); -static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts); +static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn, + TimestampTz ts); static inline void reset_apply_error_context_info(void); /* @@ -787,7 +792,8 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.committime); + set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn, + begin_data.committime); remote_final_lsn = begin_data.final_lsn; @@ -839,7 +845,8 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time); + set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn, + begin_data.prepare_time); remote_final_lsn = begin_data.prepare_lsn; @@ -938,7 +945,8 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time); + set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn, + prepare_data.commit_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -979,7 +987,8 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); - set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time); + set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn, + rollback_data.rollback_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1044,7 +1053,8 @@ apply_handle_stream_prepare(StringInfo s) errmsg_internal("tablesync worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time); + set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn, + prepare_data.prepare_time); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1126,7 +1136,7 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); - set_apply_error_context_xact(stream_xid, 0); + set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr, 0); /* * Initialize the worker's stream_fileset if we haven't yet. This will be @@ -1215,7 +1225,7 @@ apply_handle_stream_abort(StringInfo s) */ if (xid == subxid) { - set_apply_error_context_xact(xid, 0); + set_apply_error_context_xact(xid, InvalidXLogRecPtr, 0); stream_cleanup_files(MyLogicalRepWorker->subid, xid); } else @@ -1241,7 +1251,7 @@ apply_handle_stream_abort(StringInfo s) bool found = false; char path[MAXPGPATH]; - set_apply_error_context_xact(subxid, 0); + set_apply_error_context_xact(subxid, InvalidXLogRecPtr, 0); subidx = -1; begin_replication_step(); @@ -1426,7 +1436,7 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); - set_apply_error_context_xact(xid, commit_data.committime); + set_apply_error_context_xact(xid, commit_data.commit_lsn, commit_data.committime); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -3507,6 +3517,17 @@ ApplyWorkerMain(Datum main_arg) myslotname = MemoryContextStrdup(ApplyContext, syncslotname); pfree(syncslotname); + + /* + * Allocate the origin name in long-lived context for error context + * message + */ + ReplicationOriginNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); } else { @@ -3550,6 +3571,13 @@ ApplyWorkerMain(Datum main_arg) * does some initializations on the upstream so let's still call it. */ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + /* + * Allocate the origin name in long-lived context for error context + * message + */ + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); } /* @@ -3673,33 +3701,40 @@ apply_error_callback(void *arg) errcontext("processing remote data during \"%s\"", logicalrep_message_type(errarg->command)); else - errcontext("processing remote data during \"%s\" in transaction %u at %s", + errcontext("processing remote data during \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"", logicalrep_message_type(errarg->command), errarg->remote_xid, - (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); + LSN_FORMAT_ARGS(errarg->commit_lsn), + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)", + errarg->origin_name); } else if (errarg->remote_attnum < 0) - errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s", + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"", logicalrep_message_type(errarg->command), errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname, errarg->remote_xid, - (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); + LSN_FORMAT_ARGS(errarg->commit_lsn), + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)", + errarg->origin_name); else - errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s", + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"", logicalrep_message_type(errarg->command), errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname, errarg->rel->remoterel.attnames[errarg->remote_attnum], errarg->remote_xid, - (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); + LSN_FORMAT_ARGS(errarg->commit_lsn), + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)", + errarg->origin_name); } /* Set transaction information of apply error callback */ static inline void -set_apply_error_context_xact(TransactionId xid, TimestampTz ts) +set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn, TimestampTz ts) { apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.commit_lsn = lsn; apply_error_callback_arg.ts = ts; } @@ -3710,5 +3745,5 @@ reset_apply_error_context_info(void) apply_error_callback_arg.command = 0; apply_error_callback_arg.rel = NULL; apply_error_callback_arg.remote_attnum = -1; - set_apply_error_context_xact(InvalidTransactionId, 0); + set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr, 0); } -- 2.24.3 (Apple Git-128)