From 45f456701eb015d9e34ab28d580124981f90e420 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 28 Jun 2021 13:21:58 +0900 Subject: [PATCH v1 2/3] Add errcontext to errors of the applying logical replication changes. --- src/backend/replication/logical/proto.c | 41 ++++++++ src/backend/replication/logical/worker.c | 119 +++++++++++++++++++---- src/include/replication/logicalproto.h | 1 + 3 files changed, 143 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 1cf59e0fb0..08e63c3a89 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -898,3 +898,44 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, *xid = pq_getmsgint(in, 4); *subxid = pq_getmsgint(in, 4); } + +/* + * get string representing LogicalRepMsgType. + */ +const char * +logicalrep_action(LogicalRepMsgType action) +{ + switch (action) + { + case LOGICAL_REP_MSG_BEGIN: + return "BEGIN"; + case LOGICAL_REP_MSG_COMMIT: + return "COMMIT"; + case LOGICAL_REP_MSG_INSERT: + return "INSERT"; + case LOGICAL_REP_MSG_UPDATE: + return "UPDATE"; + case LOGICAL_REP_MSG_DELETE: + return "DELETE"; + case LOGICAL_REP_MSG_TRUNCATE: + return "TRUNCATE"; + case LOGICAL_REP_MSG_RELATION: + return "RELATION"; + case LOGICAL_REP_MSG_TYPE: + return "TYPE"; + case LOGICAL_REP_MSG_ORIGIN: + return "ORIGIN"; + case LOGICAL_REP_MSG_MESSAGE: + return "MESSAGE"; + case LOGICAL_REP_MSG_STREAM_START: + return "STREAM START"; + case LOGICAL_REP_MSG_STREAM_END: + return "STREAM END"; + case LOGICAL_REP_MSG_STREAM_ABORT: + return "STREAM ABORT"; + case LOGICAL_REP_MSG_STREAM_COMMIT: + return "STREAM COMMIT"; + default: + return "UNKNOWN"; + } +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b90a8df166..b65f72c9a4 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -149,6 +149,21 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; +typedef struct ApplyErrCallbackArg +{ + LogicalRepMsgType action; /* 0 if invalid */ + LogicalRepRelMapEntry *rel; + TransactionId remote_xid; + TimestampTz committs; +} ApplyErrCallbackArg; +static ApplyErrCallbackArg apply_error_callback_arg = +{ + .action = 0, + .rel = NULL, + .remote_xid = InvalidTransactionId, + .committs = 0, +}; + /* * Stream xid hash entry. Whenever we see a new xid we create this entry in the * xidhash and along with it create the streaming file and store the fileset handle. @@ -276,6 +291,8 @@ static bool start_skipping_changes(TransactionId xid); static bool stop_skipping_changes(bool reset_xid, LogicalRepCommitData *commit_data); +static void apply_error_callback(void *arg); +static void reset_apply_error_context_info(void); /* * Should this worker apply changes for given relation. @@ -788,6 +805,8 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); + apply_error_callback_arg.remote_xid = begin_data.xid; + apply_error_callback_arg.committs = begin_data.committime; remote_final_lsn = begin_data.final_lsn; @@ -828,6 +847,7 @@ apply_handle_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -876,6 +896,7 @@ apply_handle_stream_start(StringInfo s) * streaming data and subxact info. */ begin_replication_step(); + apply_error_callback_arg.remote_xid = stream_xid; /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; @@ -941,6 +962,7 @@ apply_handle_stream_stop(StringInfo s) MemoryContextReset(LogicalStreamingContext); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -964,7 +986,10 @@ apply_handle_stream_abort(StringInfo s) * just delete the files with serialized info. */ if (xid == subxid) + { + apply_error_callback_arg.remote_xid = xid; stream_cleanup_files(MyLogicalRepWorker->subid, xid); + } else { /* @@ -989,6 +1014,7 @@ apply_handle_stream_abort(StringInfo s) char path[MAXPGPATH]; StreamXidHash *ent; + apply_error_callback_arg.remote_xid = subxid; subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1013,6 +1039,7 @@ apply_handle_stream_abort(StringInfo s) cleanup_subxact_info(); end_replication_step(); CommitTransactionCommand(); + reset_apply_error_context_info(); return; } @@ -1047,6 +1074,8 @@ apply_handle_stream_abort(StringInfo s) /* Stop the skipping transaction if enabled */ stop_skipping_changes(true, NULL); + + reset_apply_error_context_info(); } /* @@ -1064,6 +1093,8 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); + apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.committs = commit_data.committime; remote_final_lsn = commit_data.commit_lsn; @@ -1083,6 +1114,8 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1330,6 +1363,8 @@ apply_handle_insert(StringInfo s) return; } + apply_error_callback_arg.rel = rel; + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -1451,6 +1486,8 @@ apply_handle_update(StringInfo s) return; } + apply_error_callback_arg.rel = rel; + /* Check if we can do the update. */ check_relation_updatable(rel); @@ -1607,6 +1644,8 @@ apply_handle_delete(StringInfo s) return; } + apply_error_callback_arg.rel = rel; + /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -2066,6 +2105,7 @@ static void apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + ErrorContextCallback errcallback; /* * Skip all data-modification changes if we're skipping changes of this @@ -2078,43 +2118,49 @@ apply_dispatch(StringInfo s) action == LOGICAL_REP_MSG_TRUNCATE)) return; + /* Push apply error context callback */ + apply_error_callback_arg.action = action; + errcallback.callback = apply_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + switch (action) { case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - return; + break; case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - return; + break; case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - return; + break; case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - return; + break; case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - return; + break; case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - return; + break; case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - return; + break; case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - return; + break; case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - return; + break; case LOGICAL_REP_MSG_MESSAGE: @@ -2123,29 +2169,32 @@ apply_dispatch(StringInfo s) * Although, it could be used by other applications that use this * output plugin. */ - return; + break; case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - return; + break; case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); - return; + break; case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - return; + break; case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - return; + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid logical replication message type \"%c\"", - action))); + /* Pop the error context stack */ + error_context_stack = errcallback.previous; } /* @@ -3412,3 +3461,37 @@ stop_skipping_changes(bool reset_xid, LogicalRepCommitData *commit_data) return true; } + +static void +apply_error_callback(void *arg) +{ + StringInfoData buf; + + initStringInfo(&buf); + appendStringInfo(&buf, _("during apply of \"%s\""), + logicalrep_action(apply_error_callback_arg.action)); + + + if (apply_error_callback_arg.rel) + appendStringInfo(&buf, _(" for relation \"%s.%s\""), + apply_error_callback_arg.rel->remoterel.nspname, + apply_error_callback_arg.rel->remoterel.relname); + + if (TransactionIdIsNormal(apply_error_callback_arg.remote_xid)) + appendStringInfo(&buf, _(" in transaction with xid %u committs %s"), + apply_error_callback_arg.remote_xid, + apply_error_callback_arg.committs == 0 + ? "(unset)" + : timestamptz_to_str(apply_error_callback_arg.committs)); + + errcontext("%s", buf.data); +} + +static void +reset_apply_error_context_info(void) +{ + apply_error_callback_arg.action = -1; + apply_error_callback_arg.rel = NULL; + apply_error_callback_arg.remote_xid = InvalidTransactionId; + apply_error_callback_arg.committs = 0; +} diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 55b90c03ea..a1bd3e2d9a 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -173,5 +173,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid); extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern const char *logicalrep_action(LogicalRepMsgType action); #endif /* LOGICAL_PROTO_H */ -- 2.24.3 (Apple Git-128)