From 8085e23327bc108ebd3ae5668a0c28946c4b4e84 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 28 Jun 2021 13:21:58 +0900 Subject: [PATCH v7 1/4] Add errcontext to errors happening during applying logical replication changes. This commit adds the error context to errors happening during applying logical replication changes, showing the command, the relation, transaction ID, and commit timestamp in the server log. Also, this additional information can be used by the follow-up commit that enables to skip the particular transaction on the subscriber. --- src/backend/replication/logical/proto.c | 51 ++++++ src/backend/replication/logical/worker.c | 203 ++++++++++++++++++++--- src/include/replication/logicalproto.h | 1 + src/tools/pgindent/typedefs.list | 1 + 4 files changed, 233 insertions(+), 23 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 52b65e9572..bb5016aa17 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1156,3 +1156,54 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, *xid = pq_getmsgint(in, 4); *subxid = pq_getmsgint(in, 4); } + +/* + * Get string representing LogicalRepMsgType. + */ +char * +logicalrep_message_type(LogicalRepMsgType action) +{ + switch (action) + { + case LOGICAL_REP_MSG_BEGIN: + return "BEGIN"; + case LOGICAL_REP_MSG_COMMIT: + return "COMMIT"; + case LOGICAL_REP_MSG_ORIGIN: + return "ORIGIN"; + case LOGICAL_REP_MSG_INSERT: + return "INSERT"; + case LOGICAL_REP_MSG_UPDATE: + return "UPDATE"; + case LOGICAL_REP_MSG_DELETE: + return "DELETE"; + case LOGICAL_REP_MSG_TRUNCATE: + return "TRUNCATE"; + case LOGICAL_REP_MSG_RELATION: + return "RELATION"; + case LOGICAL_REP_MSG_TYPE: + return "TYPE"; + case LOGICAL_REP_MSG_MESSAGE: + return "MESSAGE"; + case LOGICAL_REP_MSG_BEGIN_PREPARE: + return "BEGIN PREPARE"; + case LOGICAL_REP_MSG_PREPARE: + return "PREPARE"; + case LOGICAL_REP_MSG_COMMIT_PREPARED: + return "COMMIT PREPARED"; + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + return "ROLLBACK PREPARED"; + case LOGICAL_REP_MSG_STREAM_START: + return "STREAM START"; + case LOGICAL_REP_MSG_STREAM_END: + return "STREAM END"; + case LOGICAL_REP_MSG_STREAM_COMMIT: + return "STREAM COMMIT"; + case LOGICAL_REP_MSG_STREAM_ABORT: + return "STREAM ABORT"; + case LOGICAL_REP_MSG_STREAM_PREPARE: + return "STREAM PREPARE"; + } + + elog(ERROR, "invalid logical replication message type \"%c\"", action); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ecaed157f2..e22b8a3903 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -221,6 +221,29 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; +/* Struct for saving and restoring apply information */ +typedef struct ApplyErrCallbackArg +{ + LogicalRepMsgType command; /* 0 if invalid */ + + /* Local relation information */ + char *nspname; + char *relname; + + /* Remote transaction information */ + TransactionId remote_xid; + TimestampTz commit_ts; +} ApplyErrCallbackArg; + +static ApplyErrCallbackArg apply_error_callback_arg = +{ + .command = 0, + .nspname = NULL, + .relname = NULL, + .remote_xid = InvalidTransactionId, + .commit_ts = 0, +}; + /* * Stream xid hash entry. Whenever we see a new xid we create this entry in the * xidhash and along with it create the streaming file and store the fileset handle. @@ -335,6 +358,13 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); +/* Functions for apply error callback */ +static void apply_error_callback(void *arg); +static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz commit_ts); +static inline void set_apply_error_context_rel(LogicalRepRelMapEntry *rel); +static inline void reset_apply_error_context_rel(void); +static inline void reset_apply_error_context_info(void); + /* * Should this worker apply changes for given relation. * @@ -827,6 +857,7 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); + set_apply_error_context_xact(begin_data.xid, begin_data.committime); remote_final_lsn = begin_data.final_lsn; @@ -860,6 +891,7 @@ apply_handle_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -877,6 +909,7 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); + set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time); remote_final_lsn = begin_data.prepare_lsn; @@ -962,6 +995,7 @@ apply_handle_prepare(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -974,6 +1008,7 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); + set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -1001,6 +1036,7 @@ apply_handle_commit_prepared(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1013,6 +1049,7 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); + set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1050,6 +1087,7 @@ apply_handle_rollback_prepared(StringInfo s) process_syncing_tables(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1076,6 +1114,7 @@ apply_handle_stream_prepare(StringInfo s) errmsg_internal("tablesync worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); + set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1100,6 +1139,8 @@ apply_handle_stream_prepare(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1156,6 +1197,8 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + set_apply_error_context_xact(stream_xid, 0); + /* * Initialize the xidhash table if we haven't yet. This will be used for * the entire duration of the apply worker so create it in permanent @@ -1212,6 +1255,7 @@ apply_handle_stream_stop(StringInfo s) MemoryContextReset(LogicalStreamingContext); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1235,7 +1279,10 @@ apply_handle_stream_abort(StringInfo s) * just delete the files with serialized info. */ if (xid == subxid) + { + set_apply_error_context_xact(xid, 0); stream_cleanup_files(MyLogicalRepWorker->subid, xid); + } else { /* @@ -1260,6 +1307,8 @@ apply_handle_stream_abort(StringInfo s) char path[MAXPGPATH]; StreamXidHash *ent; + set_apply_error_context_xact(subxid, 0); + subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1284,6 +1333,7 @@ apply_handle_stream_abort(StringInfo s) cleanup_subxact_info(); end_replication_step(); CommitTransactionCommand(); + reset_apply_error_context_info(); return; } @@ -1315,6 +1365,8 @@ apply_handle_stream_abort(StringInfo s) end_replication_step(); CommitTransactionCommand(); } + + reset_apply_error_context_info(); } /* @@ -1459,6 +1511,7 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); + set_apply_error_context_xact(xid, commit_data.committime); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -1473,6 +1526,8 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1592,6 +1647,9 @@ apply_handle_insert(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -1615,6 +1673,9 @@ apply_handle_insert(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1713,6 +1774,9 @@ apply_handle_update(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the update. */ check_relation_updatable(rel); @@ -1766,6 +1830,9 @@ apply_handle_update(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1869,6 +1936,9 @@ apply_handle_delete(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -1894,6 +1964,9 @@ apply_handle_delete(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -2328,44 +2401,62 @@ static void apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + ErrorContextCallback errcallback; + bool set_callback = false; + + /* + * Push apply error context callback if not yet. Other fields will be + * filled during applying the change. Since this function can be called + * recursively when applying spooled changes, we set the callback only + * once. + */ + if (apply_error_callback_arg.command == 0) + { + errcallback.callback = apply_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + set_callback = true; + } + + apply_error_callback_arg.command = action; switch (action) { case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - return; + break; case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - return; + break; case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - return; + break; case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - return; + break; case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - return; + break; case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - return; + break; case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - return; + break; case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - return; + break; case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - return; + break; case LOGICAL_REP_MSG_MESSAGE: @@ -2374,49 +2465,53 @@ apply_dispatch(StringInfo s) * Although, it could be used by other applications that use this * output plugin. */ - return; + break; case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - return; + break; case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); - return; + break; case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - return; + break; case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - return; + break; case LOGICAL_REP_MSG_BEGIN_PREPARE: apply_handle_begin_prepare(s); - return; + break; case LOGICAL_REP_MSG_PREPARE: apply_handle_prepare(s); - return; + break; case LOGICAL_REP_MSG_COMMIT_PREPARED: apply_handle_commit_prepared(s); - return; + break; case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); - return; + break; case LOGICAL_REP_MSG_STREAM_PREPARE: apply_handle_stream_prepare(s); - return; + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid logical replication message type \"%c\"", - action))); + /* Pop the error context stack */ + if (set_callback) + error_context_stack = errcallback.previous; } /* @@ -3649,3 +3744,65 @@ IsLogicalWorker(void) { return MyLogicalRepWorker != NULL; } + +/* Error callback to give more context info about the change being applied */ +static void +apply_error_callback(void *arg) +{ + StringInfoData buf; + + if (apply_error_callback_arg.command == 0) + return; + + initStringInfo(&buf); + appendStringInfo(&buf, _("applying \"%s\""), + logicalrep_message_type(apply_error_callback_arg.command)); + + if (apply_error_callback_arg.relname) + appendStringInfo(&buf, _(" for relation \"%s.%s\""), + apply_error_callback_arg.nspname, + apply_error_callback_arg.relname); + + if (TransactionIdIsNormal(apply_error_callback_arg.remote_xid)) + appendStringInfo(&buf, _(" in transaction id %u with commit timestamp %s"), + apply_error_callback_arg.remote_xid, + apply_error_callback_arg.commit_ts == 0 + ? "(unset)" + : timestamptz_to_str(apply_error_callback_arg.commit_ts)); + + errcontext("%s", buf.data); + pfree(buf.data); +} + +/* Set transaction information of apply error callback */ +static inline void +set_apply_error_context_xact(TransactionId xid, TimestampTz commit_ts) +{ + apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.commit_ts = commit_ts; +} + +/* Set relation information of apply error callback */ +static inline void +set_apply_error_context_rel(LogicalRepRelMapEntry *rel) +{ + apply_error_callback_arg.nspname = rel->remoterel.nspname; + apply_error_callback_arg.relname = rel->remoterel.relname; +} + +/* Reset relation information of apply error callback */ +static inline void +reset_apply_error_context_rel(void) +{ + apply_error_callback_arg.nspname = NULL; + apply_error_callback_arg.relname = NULL; +} + +/* Reset all information of apply error callback */ +static inline void +reset_apply_error_context_info(void) +{ + apply_error_callback_arg.command = 0; + set_apply_error_context_xact(InvalidTransactionId, 0); + reset_apply_error_context_rel(); +} diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 2e29513151..af89f58fd3 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -246,5 +246,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid); extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern char *logicalrep_message_type(LogicalRepMsgType action); #endif /* LOGICAL_PROTO_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 37cf4b2f76..e69b708e33 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -113,6 +113,7 @@ Append AppendPath AppendRelInfo AppendState +ApplyErrCallbackArg ApplyExecutionData ApplySubXactData Archive -- 2.24.3 (Apple Git-128)