From 76f578416503478bf5e39993eec4bbd0f17d5a17 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Fri, 16 Oct 2020 12:31:35 +0530 Subject: [PATCH] Enumize top level logical replication actions Logical replication protocol uses single byte character to identify different chunks of logical repliation messages. The code uses string literals for the same. Enumize those so that 1. All the string literals used can be found at a single place. This makes it easy to add more actions without the risk of conflicts. 2. It's easy to locate the code handling a given action. Ashutosh Bapat --- src/backend/replication/logical/proto.c | 26 +++---- src/backend/replication/logical/worker.c | 87 ++++++++++++------------ src/include/replication/logicalproto.h | 25 +++++++ 3 files changed, 81 insertions(+), 57 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index eb19142b48..fdb31182d7 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in); void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) { - pq_sendbyte(out, 'B'); /* BEGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); /* fixed fields */ pq_sendint64(out, txn->final_lsn); @@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'C'); /* sending COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); /* send the flags field (unused for now) */ pq_sendbyte(out, flags); @@ -112,7 +112,7 @@ void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn) { - pq_sendbyte(out, 'O'); /* ORIGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN); /* fixed fields */ pq_sendint64(out, origin_lsn); @@ -141,7 +141,7 @@ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'I'); /* action INSERT */ + pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -185,7 +185,7 @@ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'U'); /* action UPDATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, 'D'); /* action DELETE */ + pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out, int i; uint8 flags = 0; - pq_sendbyte(out, 'T'); /* action TRUNCATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; - pq_sendbyte(out, 'R'); /* sending RELATION */ + pq_sendbyte(out, LOGICAL_REP_MSG_RELATION); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) HeapTuple tup; Form_pg_type typtup; - pq_sendbyte(out, 'Y'); /* sending TYPE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TYPE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -755,7 +755,7 @@ void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment) { - pq_sendbyte(out, 'S'); /* action STREAM START */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START); Assert(TransactionIdIsValid(xid)); @@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) void logicalrep_write_stream_stop(StringInfo out) { - pq_sendbyte(out, 'E'); /* action STREAM END */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END); } /* @@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT); Assert(TransactionIdIsValid(txn->xid)); @@ -849,7 +849,7 @@ void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid) { - pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3a5b733ee3..1e3a3e63ac 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1897,67 +1897,66 @@ apply_handle_truncate(StringInfo s) static void apply_dispatch(StringInfo s) { - char action = pq_getmsgbyte(s); + LogicalRepMsgType action = pq_getmsgbyte(s); switch (action) { - /* BEGIN */ - case 'B': + case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - break; - /* COMMIT */ - case 'C': + return; + + case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - break; - /* INSERT */ - case 'I': + return; + + case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - break; - /* UPDATE */ - case 'U': + return; + + case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - break; - /* DELETE */ - case 'D': + return; + + case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - break; - /* TRUNCATE */ - case 'T': + return; + + case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - break; - /* RELATION */ - case 'R': + return; + + case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - break; - /* TYPE */ - case 'Y': + return; + + case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - break; - /* ORIGIN */ - case 'O': + return; + + case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - break; - /* STREAM START */ - case 'S': + return; + + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - break; - /* STREAM END */ - case 'E': + return; + + case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); - break; - /* STREAM ABORT */ - case 'A': + return; + + case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - break; - /* STREAM COMMIT */ - case 'c': + return; + + case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - break; - default: - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid logical replication message type \"%c\"", action))); + return; } + + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } /* diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 0c2cda264e..15ee2304c8 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -33,6 +33,31 @@ #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM +/* + * Logical message types + * + * Used by logical replication wire protocol. + * + * Note: though this is an enum it should fit a single byte and should be a + * printable character. + */ +typedef enum +{ + LOGICAL_REP_MSG_BEGIN = 'B', + LOGICAL_REP_MSG_COMMIT = 'C', + LOGICAL_REP_MSG_ORIGIN = 'O', + LOGICAL_REP_MSG_INSERT = 'I', + LOGICAL_REP_MSG_UPDATE = 'U', + LOGICAL_REP_MSG_DELETE = 'D', + LOGICAL_REP_MSG_TRUNCATE = 'T', + LOGICAL_REP_MSG_RELATION = 'R', + LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_STREAM_START = 'S', + LOGICAL_REP_MSG_STREAM_END = 'E', + LOGICAL_REP_MSG_STREAM_COMMIT = 'c', + LOGICAL_REP_MSG_STREAM_ABORT = 'A', +} LogicalRepMsgType; + /* * This struct stores a tuple received via logical replication. * Keep in mind that the columns correspond to the *remote* table. -- 2.17.1