From 98c5447e985221bd0e49583db7e73ea2154eff24 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 28 Jun 2021 13:21:58 +0900 Subject: [PATCH v3 1/3] Add errcontext to errors of the applying logical replication changes. This commit adds the error context to errors happening during applying logical replication changes, showing the command, the relation relation, transaction ID, and commit timestamp in the server log. --- src/backend/commands/tablecmds.c | 7 + src/backend/replication/logical/proto.c | 49 +++++ src/backend/replication/logical/worker.c | 220 ++++++++++++++++++++--- src/include/replication/logicalproto.h | 1 + src/include/replication/logicalworker.h | 2 + 5 files changed, 257 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index a16e749506..a500abaf2f 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -78,6 +78,7 @@ #include "partitioning/partbounds.h" #include "partitioning/partdesc.h" #include "pgstat.h" +#include "replication/logicalworker.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" @@ -1897,6 +1898,9 @@ ExecuteTruncateGuts(List *explicit_rels, continue; } + /* Set logical replication error callback info if necessary */ + set_logicalrep_error_context_rel(rel); + /* * Build the lists of foreign tables belonging to each foreign server * and pass each list to the foreign data wrapper's callback function, @@ -2004,6 +2008,9 @@ ExecuteTruncateGuts(List *explicit_rels, pgstat_count_truncate(rel); } + /* Reset logical replication error callback info */ + reset_logicalrep_error_context_rel(); + /* Now go through the hash table, and truncate foreign tables */ if (ft_htab) { diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index a245252529..54fff7df21 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1109,3 +1109,52 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, *xid = pq_getmsgint(in, 4); *subxid = pq_getmsgint(in, 4); } + +/* + * get string representing LogicalRepMsgType. + */ +char * +logicalrep_message_type(LogicalRepMsgType action) +{ + switch (action) + { + case LOGICAL_REP_MSG_BEGIN: + return "BEGIN"; + case LOGICAL_REP_MSG_COMMIT: + return "COMMIT"; + case LOGICAL_REP_MSG_INSERT: + return "INSERT"; + case LOGICAL_REP_MSG_UPDATE: + return "UPDATE"; + case LOGICAL_REP_MSG_DELETE: + return "DELETE"; + case LOGICAL_REP_MSG_TRUNCATE: + return "TRUNCATE"; + case LOGICAL_REP_MSG_RELATION: + return "RELATION"; + case LOGICAL_REP_MSG_TYPE: + return "TYPE"; + case LOGICAL_REP_MSG_ORIGIN: + return "ORIGIN"; + case LOGICAL_REP_MSG_MESSAGE: + return "MESSAGE"; + case LOGICAL_REP_MSG_STREAM_START: + return "STREAM START"; + case LOGICAL_REP_MSG_STREAM_END: + return "STREAM END"; + case LOGICAL_REP_MSG_STREAM_ABORT: + return "STREAM ABORT"; + case LOGICAL_REP_MSG_STREAM_COMMIT: + return "STREAM COMMIT"; + case LOGICAL_REP_MSG_BEGIN_PREPARE: + return "BEGIN PREPARE"; + case LOGICAL_REP_MSG_PREPARE: + return "PREPARE"; + case LOGICAL_REP_MSG_COMMIT_PREPARED: + return "COMMIT PREPARED"; + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + return "ROLLBACK PREPARED"; + } + + elog(ERROR, "invalid logical replication message type \"%c\"", action); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 186be1a188..d346377b20 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -221,6 +221,27 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; +/* Struct for saving and restoring apply information */ +typedef struct ApplyErrCallbackArg +{ + LogicalRepMsgType command; /* 0 if invalid */ + + /* Local relation information */ + char *nspname; /* used for error context */ + char *relname; /* used for error context */ + + TransactionId remote_xid; + TimestampTz committs; +} ApplyErrCallbackArg; +static ApplyErrCallbackArg apply_error_callback_arg = +{ + .command = 0, + .relname = NULL, + .nspname = NULL, + .remote_xid = InvalidTransactionId, + .committs = 0, +}; + /* * Stream xid hash entry. Whenever we see a new xid we create this entry in the * xidhash and along with it create the streaming file and store the fileset handle. @@ -332,6 +353,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); +static void apply_error_callback(void *arg); +static void set_apply_error_context_rel(LogicalRepRelMapEntry *rel); +static void reset_apply_error_context_rel(void); +static void reset_apply_error_context_info(void); /* * Should this worker apply changes for given relation. @@ -825,6 +850,8 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); + apply_error_callback_arg.remote_xid = begin_data.xid; + apply_error_callback_arg.committs = begin_data.committime; remote_final_lsn = begin_data.final_lsn; @@ -858,6 +885,7 @@ apply_handle_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -875,6 +903,8 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); + apply_error_callback_arg.remote_xid = begin_data.xid; + apply_error_callback_arg.committs = begin_data.prepare_time; remote_final_lsn = begin_data.prepare_lsn; @@ -949,6 +979,7 @@ apply_handle_prepare(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -961,6 +992,8 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); + apply_error_callback_arg.remote_xid = prepare_data.xid; + apply_error_callback_arg.committs = prepare_data.commit_time; /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -988,6 +1021,7 @@ apply_handle_commit_prepared(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1000,6 +1034,7 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); + apply_error_callback_arg.remote_xid = rollback_data.xid; /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1037,6 +1072,7 @@ apply_handle_rollback_prepared(StringInfo s) process_syncing_tables(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1093,6 +1129,8 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + apply_error_callback_arg.remote_xid = stream_xid; + /* * Initialize the xidhash table if we haven't yet. This will be used for * the entire duration of the apply worker so create it in permanent @@ -1149,6 +1187,7 @@ apply_handle_stream_stop(StringInfo s) MemoryContextReset(LogicalStreamingContext); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1172,7 +1211,10 @@ apply_handle_stream_abort(StringInfo s) * just delete the files with serialized info. */ if (xid == subxid) + { + apply_error_callback_arg.remote_xid = xid; stream_cleanup_files(MyLogicalRepWorker->subid, xid); + } else { /* @@ -1197,6 +1239,7 @@ apply_handle_stream_abort(StringInfo s) char path[MAXPGPATH]; StreamXidHash *ent; + apply_error_callback_arg.remote_xid = subxid; subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1221,6 +1264,7 @@ apply_handle_stream_abort(StringInfo s) cleanup_subxact_info(); end_replication_step(); CommitTransactionCommand(); + reset_apply_error_context_info(); return; } @@ -1252,6 +1296,8 @@ apply_handle_stream_abort(StringInfo s) end_replication_step(); CommitTransactionCommand(); } + + reset_apply_error_context_info(); } /* @@ -1276,6 +1322,8 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); + apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.committs = commit_data.committime; elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -1398,6 +1446,8 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1517,6 +1567,9 @@ apply_handle_insert(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -1540,6 +1593,9 @@ apply_handle_insert(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1638,6 +1694,9 @@ apply_handle_update(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the update. */ check_relation_updatable(rel); @@ -1691,6 +1750,9 @@ apply_handle_update(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1794,6 +1856,9 @@ apply_handle_delete(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -1819,6 +1884,9 @@ apply_handle_delete(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -2223,6 +2291,9 @@ apply_handle_truncate(StringInfo s) * Even if we used CASCADE on the upstream primary we explicitly default * to replaying changes without further cascading. This might be later * changeable with a user specified option. + * + * Both namespace and relation name for error callback will be set in + * ExecuteTruncateGuts(). */ ExecuteTruncateGuts(rels, relids, @@ -2253,44 +2324,54 @@ static void apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + ErrorContextCallback errcallback; + + /* + * Push apply error context callback. Other fields will be filled during + * applying the change. + */ + apply_error_callback_arg.command = action; + errcallback.callback = apply_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; switch (action) { case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - return; + break; case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - return; + break; case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - return; + break; case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - return; + break; case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - return; + break; case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - return; + break; case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - return; + break; case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - return; + break; case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - return; + break; case LOGICAL_REP_MSG_MESSAGE: @@ -2299,45 +2380,48 @@ apply_dispatch(StringInfo s) * Although, it could be used by other applications that use this * output plugin. */ - return; + break; case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - return; + break; case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); - return; + break; case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - return; + break; case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - return; + break; case LOGICAL_REP_MSG_BEGIN_PREPARE: apply_handle_begin_prepare(s); - return; + break; case LOGICAL_REP_MSG_PREPARE: apply_handle_prepare(s); - return; + break; case LOGICAL_REP_MSG_COMMIT_PREPARED: apply_handle_commit_prepared(s); - return; + break; case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); - return; + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid logical replication message type \"%c\"", - action))); + /* Pop the error context stack */ + error_context_stack = errcallback.previous; } /* @@ -3570,3 +3654,95 @@ IsLogicalWorker(void) { return MyLogicalRepWorker != NULL; } + +static void +apply_error_callback(void *arg) +{ + StringInfoData buf; + + if (apply_error_callback_arg.command == 0) + return; + + initStringInfo(&buf); + appendStringInfo(&buf, _("during apply of \"%s\""), + logicalrep_message_type(apply_error_callback_arg.command)); + + if (apply_error_callback_arg.relname) + appendStringInfo(&buf, _(" for relation \"%s.%s\""), + apply_error_callback_arg.nspname, + apply_error_callback_arg.relname); + + if (TransactionIdIsNormal(apply_error_callback_arg.remote_xid)) + appendStringInfo(&buf, _(" in transaction with xid %u committs %s"), + apply_error_callback_arg.remote_xid, + apply_error_callback_arg.committs == 0 + ? "(unset)" + : timestamptz_to_str(apply_error_callback_arg.committs)); + + errcontext("%s", buf.data); +} + +/* Set relation information of apply error callback */ +static void +set_apply_error_context_rel(LogicalRepRelMapEntry *rel) +{ + apply_error_callback_arg.nspname = rel->remoterel.nspname; + apply_error_callback_arg.relname = rel->remoterel.relname; +} + +/* Reset relation information of apply error callback */ +static void +reset_apply_error_context_rel(void) +{ + apply_error_callback_arg.nspname = NULL; + apply_error_callback_arg.relname = NULL; +} + +/* Reset all information of apply error callback */ +static void +reset_apply_error_context_info(void) +{ + apply_error_callback_arg.command = 0; + apply_error_callback_arg.remote_xid = InvalidTransactionId; + apply_error_callback_arg.committs = 0; + reset_apply_error_context_rel(); +} + +/* + * Set relation information of error callback. + * + * Both set_logicalrep_error_context_rel() and + * reset_logicalrep_error_context_rel() functions are intended to be + * used by functions outside of logical replication module where don't + * use LogicalRepRelMapEntry. + * + * The caller must call reset_logicalrep_error_context_rel() after use + * so we free the memory used for names. + */ +void +set_logicalrep_error_context_rel(Relation rel) +{ + if (IsLogicalWorker()) + { + apply_error_callback_arg.nspname = + get_namespace_name(RelationGetNamespace(rel)); + apply_error_callback_arg.relname = + pstrdup(RelationGetRelationName(rel)); + } +} + +/* Reset relation information for error callback set */ +void +reset_logicalrep_error_context_rel(void) +{ + if (IsLogicalWorker()) + { + if (apply_error_callback_arg.nspname) + pfree(apply_error_callback_arg.nspname); + apply_error_callback_arg.nspname = NULL; + + if (apply_error_callback_arg.relname) + pfree(apply_error_callback_arg.relname); + apply_error_callback_arg.relname = NULL; + } +} diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 63de90d94a..c78a4409bc 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -242,5 +242,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid); extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern char *logicalrep_message_type(LogicalRepMsgType action); #endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 2ad61a001a..d3e8514ffd 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -15,5 +15,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void set_logicalrep_error_context_rel(Relation rel); +extern void reset_logicalrep_error_context_rel(void); #endif /* LOGICALWORKER_H */ -- 2.24.3 (Apple Git-128)