From 4040ada4886220bb979edab83f7f88d32679dcbd Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Sat, 27 Apr 2024 15:48:25 +0000 Subject: [PATCH v20 4/5] Optimize Logical Replication apply with multi inserts --- src/backend/executor/execReplication.c | 39 +++ src/backend/replication/logical/proto.c | 24 ++ src/backend/replication/logical/worker.c | 351 ++++++++++++++++++++++- src/include/executor/executor.h | 4 + src/include/replication/logicalproto.h | 2 + src/tools/pgindent/typedefs.list | 2 + 6 files changed, 409 insertions(+), 13 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd577..fae1375537 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -544,6 +544,45 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, } } +void +ExecRelationMultiInsert(TableModifyState *MultiInsertState, + ResultRelInfo *resultRelInfo, + EState *estate, TupleTableSlot *slot) +{ + bool skip_tuple = false; + Relation rel = resultRelInfo->ri_RelationDesc; + + /* For now we support only tables. */ + Assert(rel->rd_rel->relkind == RELKIND_RELATION); + + CheckCmdReplicaIdentity(rel, CMD_INSERT); + + /* BEFORE ROW INSERT Triggers */ + if (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row) + { + if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) + skip_tuple = true; /* "do nothing" */ + } + + if (!skip_tuple) + { + /* Compute stored generated columns */ + if (rel->rd_att->constr && + rel->rd_att->constr->has_generated_stored) + ExecComputeStoredGenerated(resultRelInfo, estate, slot, + CMD_INSERT); + + /* Check the constraints of the tuple */ + if (rel->rd_att->constr) + ExecConstraints(resultRelInfo, slot, estate); + if (rel->rd_rel->relispartition) + ExecPartitionCheck(resultRelInfo, slot, estate, true); + + table_modify_buffer_insert(MultiInsertState, slot); + } +} + /* * Find the searchslot tuple and update it with data in the slot, * update the indexes, and execute any constraints and per-row triggers. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 95c09c9516..46d38aebd2 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -427,6 +427,30 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, logicalrep_write_tuple(out, rel, newslot, binary, columns); } +LogicalRepRelId +logicalrep_read_relid(StringInfo in) +{ + LogicalRepRelId relid; + + /* read the relation id */ + relid = pq_getmsgint(in, 4); + + return relid; +} + +void +logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup) +{ + char action; + + action = pq_getmsgbyte(in); + if (action != 'N') + elog(ERROR, "expected new tuple but got %d", + action); + + logicalrep_read_tuple(in, newtup); +} + /* * Read INSERT from stream. * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe3e8..d62772f590 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -148,7 +148,6 @@ #include #include "access/table.h" -#include "access/tableam.h" #include "access/twophase.h" #include "access/xact.h" #include "catalog/indexing.h" @@ -416,6 +415,30 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +typedef enum LRMultiInsertReturnStatus +{ + LR_MULTI_INSERT_NONE = 0, + LR_MULTI_INSERT_REL_SKIPPED, + LR_MULTI_INSERT_DISALLOWED, + LR_MULTI_INSERT_DONE, +} LRMultiInsertReturnStatus; + +static TableModifyState *MultiInsertState = NULL; +static LogicalRepRelMapEntry *LastRel = NULL; +static LogicalRepRelId LastMultiInsertRelId = InvalidOid; +static ApplyExecutionData *LastEData = NULL; +static TupleTableSlot *LastRemoteSlot = NULL; + +typedef struct LRModifyBufferFlushContext +{ + ResultRelInfo *resultRelInfo; + EState *estate; +} LRModifyBufferFlushContext; + +static LRModifyBufferFlushContext *modify_buffer_flush_context = NULL; +static void LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot); +static void FinishMultiInserts(void); + /* * Form the origin name for the subscription. * @@ -1017,6 +1040,8 @@ apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + FinishMultiInserts(); + logicalrep_read_commit(s, &commit_data); if (commit_data.commit_lsn != remote_final_lsn) @@ -1043,6 +1068,8 @@ apply_handle_begin_prepare(StringInfo s) { LogicalRepPreparedTxnData begin_data; + FinishMultiInserts(); + /* Tablesync should never receive prepare. */ if (am_tablesync_worker()) ereport(ERROR, @@ -1109,6 +1136,8 @@ apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; + FinishMultiInserts(); + logicalrep_read_prepare(s, &prepare_data); if (prepare_data.prepare_lsn != remote_final_lsn) @@ -1171,6 +1200,8 @@ apply_handle_commit_prepared(StringInfo s) LogicalRepCommitPreparedTxnData prepare_data; char gid[GIDSIZE]; + FinishMultiInserts(); + logicalrep_read_commit_prepared(s, &prepare_data); set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); @@ -1220,6 +1251,8 @@ apply_handle_rollback_prepared(StringInfo s) LogicalRepRollbackPreparedTxnData rollback_data; char gid[GIDSIZE]; + FinishMultiInserts(); + logicalrep_read_rollback_prepared(s, &rollback_data); set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn); @@ -1277,6 +1310,8 @@ apply_handle_stream_prepare(StringInfo s) /* Save the message before it is consumed. */ StringInfoData original_msg = *s; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1304,6 +1339,8 @@ apply_handle_stream_prepare(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, prepare_data.xid, prepare_data.prepare_lsn); + FinishMultiInserts(); + /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -1407,6 +1444,8 @@ apply_handle_stream_prepare(StringInfo s) static void apply_handle_origin(StringInfo s) { + FinishMultiInserts(); + /* * ORIGIN message can only come inside streaming transaction or inside * remote transaction and before any actual writes. @@ -1473,6 +1512,8 @@ apply_handle_stream_start(StringInfo s) /* Save the message before it is consumed. */ StringInfoData original_msg = *s; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1628,6 +1669,8 @@ apply_handle_stream_stop(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + FinishMultiInserts(); + if (!in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1821,6 +1864,8 @@ apply_handle_stream_abort(StringInfo s) StringInfoData original_msg = *s; bool toplevel_xact; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2138,6 +2183,8 @@ apply_handle_stream_commit(StringInfo s) /* Save the message before it is consumed. */ StringInfoData original_msg = *s; + FinishMultiInserts(); + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2159,6 +2206,8 @@ apply_handle_stream_commit(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, commit_data.commit_lsn); + FinishMultiInserts(); + apply_handle_commit_internal(&commit_data); /* Unlink the files with serialized changes and subxact info. */ @@ -2302,6 +2351,8 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; + FinishMultiInserts(); + if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) return; @@ -2325,6 +2376,8 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; + FinishMultiInserts(); + if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) return; @@ -2363,16 +2416,126 @@ TargetPrivilegesCheck(Relation rel, AclMode mode) RelationGetRelationName(rel)))); } -/* - * Handle INSERT message. - */ +static void +FinishMultiInserts(void) +{ + LogicalRepMsgType saved_command; + + if (MultiInsertState == NULL) + return; + + Assert(OidIsValid(LastMultiInsertRelId)); + Assert(LastEData != NULL); + + /* Set relation for error callback */ + apply_error_callback_arg.rel = LastRel; + + /* Set current command for error callback */ + saved_command = apply_error_callback_arg.command; + apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT; + + ExecDropSingleTupleTableSlot(LastRemoteSlot); + LastRemoteSlot = NULL; + + table_modify_end(MultiInsertState); + MultiInsertState = NULL; + LastMultiInsertRelId = InvalidOid; + + pfree(modify_buffer_flush_context); + modify_buffer_flush_context = NULL; + + ExecCloseIndices(LastEData->targetRelInfo); + + finish_edata(LastEData); + LastEData = NULL; + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + /* Reset the current command */ + apply_error_callback_arg.command = saved_command; + + logicalrep_rel_close(LastRel, NoLock); + LastRel = NULL; + + end_replication_step(); +} static void -apply_handle_insert(StringInfo s) +LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot) +{ + LRModifyBufferFlushContext *ctx = (LRModifyBufferFlushContext *) context; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + LogicalRepMsgType saved_command; + + /* Quick exit if no indexes or no triggers */ + if (!(resultRelInfo->ri_NumIndices > 0 || + (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)))) + return; + + /* Set relation for error callback */ + apply_error_callback_arg.rel = LastRel; + + /* Set current command for error callback */ + saved_command = apply_error_callback_arg.command; + apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT; + + /* Caller must take care of opening and closing the indices */ + + /* + * If there are any indexes, update them for all the inserted tuples, and + * run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, false); + + ExecARInsertTriggers(estate, resultRelInfo, + slot, recheckIndexes, + NULL); + + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers + * anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + ExecARInsertTriggers(estate, resultRelInfo, + slot, NIL, + NULL); + } + + /* + * XXX we should in theory pass a TransitionCaptureState object to the + * above to capture transition tuples, but after statement triggers don't + * actually get fired by replication yet anyway + */ + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + /* Reset the current command */ + apply_error_callback_arg.command = saved_command; +} + +static LRMultiInsertReturnStatus +do_multi_inserts(StringInfo s, LogicalRepRelId *relid) { LogicalRepRelMapEntry *rel; LogicalRepTupleData newtup; - LogicalRepRelId relid; UserContext ucxt; ApplyExecutionData *edata; EState *estate; @@ -2380,17 +2543,143 @@ apply_handle_insert(StringInfo s) MemoryContext oldctx; bool run_as_owner; + if (MultiInsertState == NULL) + begin_replication_step(); + + *relid = logicalrep_read_relid(s); + + if (MultiInsertState != NULL && + (LastMultiInsertRelId != InvalidOid && + *relid != InvalidOid && + LastMultiInsertRelId != *relid)) + FinishMultiInserts(); + + if (MultiInsertState == NULL) + rel = logicalrep_rel_open(*relid, RowExclusiveLock); + else + rel = LastRel; + + if (!should_apply_changes_for_rel(rel)) + { + Assert(MultiInsertState == NULL); + + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); + return LR_MULTI_INSERT_REL_SKIPPED; + } + + /* For a partitioned table, let's not do multi inserts. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + Assert(MultiInsertState == NULL); + + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); + return LR_MULTI_INSERT_DISALLOWED; + } + /* - * Quick return if we are skipping data modification changes or handling - * streamed transactions. + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. */ - if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) - return; + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + + if (MultiInsertState == NULL) + { + oldctx = MemoryContextSwitchTo(TopTransactionContext); + + /* Initialize the executor state. */ + LastEData = edata = create_edata_for_relation(rel); + estate = edata->estate; + + LastRemoteSlot = remoteslot = MakeTupleTableSlot(RelationGetDescr(rel->localrel), + &TTSOpsVirtual); + + modify_buffer_flush_context = (LRModifyBufferFlushContext *) palloc(sizeof(LRModifyBufferFlushContext)); + modify_buffer_flush_context->resultRelInfo = edata->targetRelInfo; + modify_buffer_flush_context->estate = estate; + + MultiInsertState = table_modify_begin(edata->targetRelInfo->ri_RelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + 0, + LRModifyBufferFlushCallback, + modify_buffer_flush_context); + LastRel = rel; + LastMultiInsertRelId = *relid; + + /* We must open indexes here. */ + ExecOpenIndices(edata->targetRelInfo, false); + + MemoryContextSwitchTo(oldctx); + } + else + { + CommandId cid; + + edata = LastEData; + estate = edata->estate; + ResetExprContext(GetPerTupleExprContext(estate)); + ExecClearTuple(LastRemoteSlot); + remoteslot = LastRemoteSlot; + cid = GetCurrentCommandId(true); + MultiInsertState->cid = cid; + estate->es_output_cid = cid; + } + + /* Process and store remote tuple in the slot */ + logicalrep_read_insert_v2(s, &newtup); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot, rel, &newtup); + slot_fill_defaults(rel, estate, remoteslot); + MemoryContextSwitchTo(oldctx); + + TargetPrivilegesCheck(edata->targetRelInfo->ri_RelationDesc, ACL_INSERT); + ExecRelationMultiInsert(MultiInsertState, edata->targetRelInfo, estate, remoteslot); + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + Assert(MultiInsertState != NULL); + + CommandCounterIncrement(); + + return LR_MULTI_INSERT_DONE; +} + +static bool +do_single_inserts(StringInfo s, LogicalRepRelId relid) +{ + LogicalRepRelMapEntry *rel; + LogicalRepTupleData newtup; + UserContext ucxt; + ApplyExecutionData *edata; + EState *estate; + TupleTableSlot *remoteslot; + MemoryContext oldctx; + bool run_as_owner; + + Assert(relid != InvalidOid); begin_replication_step(); - relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); if (!should_apply_changes_for_rel(rel)) { @@ -2400,7 +2689,7 @@ apply_handle_insert(StringInfo s) */ logicalrep_rel_close(rel, RowExclusiveLock); end_replication_step(); - return; + return false; } /* @@ -2422,6 +2711,7 @@ apply_handle_insert(StringInfo s) &TTSOpsVirtual); /* Process and store remote tuple in the slot */ + logicalrep_read_insert_v2(s, &newtup); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); @@ -2446,6 +2736,35 @@ apply_handle_insert(StringInfo s) logicalrep_rel_close(rel, NoLock); end_replication_step(); + + return true; +} + +/* + * Handle INSERT message. + */ +static void +apply_handle_insert(StringInfo s) +{ + LRMultiInsertReturnStatus mi_status; + LogicalRepRelId relid; + + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + return; + + mi_status = do_multi_inserts(s, &relid); + if (mi_status == LR_MULTI_INSERT_REL_SKIPPED || + mi_status == LR_MULTI_INSERT_DONE) + return; + + do_single_inserts(s, relid); + + return; } /* @@ -2532,6 +2851,8 @@ apply_handle_update(StringInfo s) MemoryContext oldctx; bool run_as_owner; + FinishMultiInserts(); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. @@ -2713,6 +3034,8 @@ apply_handle_delete(StringInfo s) MemoryContext oldctx; bool run_as_owner; + FinishMultiInserts(); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. @@ -3154,6 +3477,8 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; + FinishMultiInserts(); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 9770752ea3..8f10ea977b 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -14,6 +14,7 @@ #ifndef EXECUTOR_H #define EXECUTOR_H +#include "access/tableam.h" #include "executor/execdesc.h" #include "fmgr.h" #include "nodes/lockoptions.h" @@ -656,6 +657,9 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); +extern void ExecRelationMultiInsert(TableModifyState *MultiInsertState, + ResultRelInfo *resultRelInfo, + EState *estate, TupleTableSlot *slot); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index c409638a2e..3f3a7f0a31 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -226,6 +226,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns); +extern LogicalRepRelId logicalrep_read_relid(StringInfo in); +extern void logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 57aabf51d8..9582503bb4 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1454,6 +1454,8 @@ LPTHREAD_START_ROUTINE LPTSTR LPVOID LPWSTR +LRModifyBufferFlushContext +LRMultiInsertReturnStatus LSEG LUID LVRelState -- 2.34.1