From 97a21e11ff5df940a35f1cad62c56c320e92dd78 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 7 Dec 2021 00:25:49 +0900 Subject: [PATCH 2/2] Improve input data check of logical replication. --- src/backend/replication/logical/proto.c | 47 +++++++++++++++++++------ 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b639..c85cad7859 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -20,6 +20,11 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" +/* Macros for checking input data */ +#define LOGICALREP_CHECK_INVALID_LSN(lsn) unlikely(XLogRecPtrIsInvalid(lsn)) +#define LOGICALREP_CHECK_INVALID_TIMESTAMP(ts) unlikely((ts) == 0) +#define LOGICALREP_CHECK_INVALID_XID(xid) unlikely(!TransactionIdIsValid(xid)) + /* * Protocol message flags. */ @@ -61,7 +66,7 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) { /* read fields */ begin_data->final_lsn = pq_getmsgint64(in); - if (begin_data->final_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(begin_data->final_lsn)) elog(ERROR, "final_lsn not set in begin message"); begin_data->committime = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -132,10 +137,10 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da { /* read fields */ begin_data->prepare_lsn = pq_getmsgint64(in); - if (begin_data->prepare_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(begin_data->prepare_lsn)) elog(ERROR, "prepare_lsn not set in begin prepare message"); begin_data->end_lsn = pq_getmsgint64(in); - if (begin_data->end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(begin_data->end_lsn)) elog(ERROR, "end_lsn not set in begin prepare message"); begin_data->prepare_time = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -204,14 +209,16 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype, /* read fields */ prepare_data->prepare_lsn = pq_getmsgint64(in); - if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->prepare_lsn)) elog(ERROR, "prepare_lsn is not set in %s message", msgtype); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in %s message", msgtype); prepare_data->prepare_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(prepare_data->prepare_time)) + elog(ERROR, "prepare_time is not set in %s message", msgtype); prepare_data->xid = pq_getmsgint(in, 4); - if (prepare_data->xid == InvalidTransactionId) + if (LOGICALREP_CHECK_INVALID_XID(prepare_data->xid)) elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype); /* read gid (copy it into a pre-allocated buffer) */ @@ -271,13 +278,17 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * /* read fields */ prepare_data->commit_lsn = pq_getmsgint64(in); - if (prepare_data->commit_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->commit_lsn)) elog(ERROR, "commit_lsn is not set in commit prepared message"); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in commit prepared message"); prepare_data->commit_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(prepare_data->commit_time)) + elog(ERROR, "commit_time is not set in commit prepared message"); prepare_data->xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(prepare_data->xid)) + elog(ERROR, "invalid two-phase transaction ID in commit prepared message"); /* read gid (copy it into a pre-allocated buffer) */ strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid)); @@ -330,14 +341,20 @@ logicalrep_read_rollback_prepared(StringInfo in, /* read fields */ rollback_data->prepare_end_lsn = pq_getmsgint64(in); - if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(rollback_data->prepare_end_lsn)) elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); rollback_data->rollback_end_lsn = pq_getmsgint64(in); - if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(rollback_data->rollback_end_lsn)) elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); rollback_data->prepare_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(rollback_data->prepare_time)) + elog(ERROR, "prepare_time is not set in rollback prepared message"); rollback_data->rollback_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(rollback_data->rollback_time)) + elog(ERROR, "rollback_time is not set in rollback prepared message"); rollback_data->xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(rollback_data->xid)) + elog(ERROR, "invalid two-phase transaction ID in rollback prepared message"); /* read gid (copy it into a pre-allocated buffer) */ strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid)); @@ -1063,6 +1080,8 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) Assert(first_segment); xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(xid)) + elog(ERROR, "xid not set in stream start message"); *first_segment = (pq_getmsgbyte(in) == 1); return xid; @@ -1121,7 +1140,11 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) /* read fields */ commit_data->commit_lsn = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_LSN(commit_data->commit_lsn)) + elog(ERROR, "commit_lsn not set in stream commit message"); commit_data->end_lsn = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_LSN(commit_data->end_lsn)) + elog(ERROR, "end_lsn not set in stream commit message"); commit_data->committime = pq_getmsgint64(in); return xid; @@ -1154,7 +1177,11 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, Assert(xid && subxid); *xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(*xid)) + elog(ERROR, "xid not set in stream abort message"); *subxid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(*subxid)) + elog(ERROR, "subxid not set in stream abort message"); } /* -- 2.24.3 (Apple Git-128)