From 9f166c859a86f4f6e3803cbb862cd189168b07b6 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 11 Dec 2022 19:08:17 +0800 Subject: [PATCH v61 2/2] Serialize partial changes to a file when the attempt to send data times out. In patch 0001 if the leader apply worker times out while attempting to send a message to the parallel apply worker it results in an ERROR. This patch (0002) modifies that behaviour, so instead of erroring it will switch to "partial serialize" mode - in this mode the leader serializes all remaining changes to a file and notifies the parallel apply workers to read and apply them at the end of the transaction. --- .../replication/logical/applyparallelworker.c | 220 ++++++++- src/backend/replication/logical/worker.c | 502 ++++++++++++++++----- src/include/replication/worker_internal.h | 55 ++- src/tools/pgindent/typedefs.list | 1 + 4 files changed, 634 insertions(+), 144 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index c81fcea..11695c7 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -137,9 +137,11 @@ * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to * wait to send messages, and this wait doesn't appear in lmgr. * - * To resolve this issue, we use non-blocking write and wait with a timeout. If - * the timeout is exceeded, the LA reports an error and restarts logical - * replication. + * To avoid this wait, we use a non-blocking write and wait with a timeout. If + * the timeout is exceeded, the LA will serialize the message to a file and + * indicate PA-2 that it needs to read that file for the remaining messages. + * Then LA will start waiting for commit which will detect deadlock if any. + * See pa_send_data() and enum TransApplyAction. * * 4) Lock types * @@ -252,6 +254,7 @@ static List *subxactlist = NIL; static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); +static PartialFileSetState pa_get_fileset_state(void); /* * Returns true if it is OK to start a parallel apply worker, false otherwise. @@ -494,9 +497,11 @@ pa_allocate_worker(TransactionId xid) SpinLockAcquire(&winfo->shared->mutex); winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; winfo->shared->xid = xid; + winfo->shared->fileset_state = FS_UNKNOWN; SpinLockRelease(&winfo->shared->mutex); winfo->in_use = true; + winfo->serialize_changes = false; entry->winfo = winfo; entry->xid = xid; } @@ -570,8 +575,17 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) napplyworkers = logicalrep_pa_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); - /* Stop the worker if there are enough workers in the pool. */ - if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) + /* + * Stop the worker if there are enough workers in the pool. + * + * XXX The worker is also stopped if the leader apply worker needed to + * serialize part of the transaction data due to a send timeout. This is + * because the message could be partially written to the queue but there is + * no way to clean the queue other than resending the message until it + * succeeds. Directly stopping the worker avoids needing this complexity. + */ + if (winfo->serialize_changes || + napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) { int slot_no; uint16 generation; @@ -589,11 +603,15 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) } winfo->in_use = false; + winfo->serialize_changes = false; return false; } -/* Free the parallel apply worker information. */ +/* + * Free the parallel apply worker information and unlink the files with + * serialized changes if any. + */ static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo) { @@ -605,6 +623,10 @@ pa_free_worker_info(ParallelApplyWorkerInfo *winfo) if (winfo->error_mq_handle) shm_mq_detach(winfo->error_mq_handle); + /* Unlink the files with serialized changes. */ + if (winfo->serialize_changes) + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); + if (winfo->dsm_seg) dsm_detach(winfo->dsm_seg); @@ -684,6 +706,73 @@ pa_stop_idle_workers(void) } /* + * Check if the parallel apply worker is pending due to spooled messages. + */ +static bool +pa_has_spooled_message_pending() +{ + PartialFileSetState fileset_state; + + fileset_state = pa_get_fileset_state(); + + if (fileset_state != FS_UNKNOWN) + return true; + else + return false; +} + +/* + * Replay the spooled messages in the parallel apply worker if the leader apply + * worker has finished serializing changes to the file. + */ +static void +pa_spooled_messages(void) +{ + PartialFileSetState fileset_state; + + fileset_state = pa_get_fileset_state(); + + /* + * If the leader apply worker is busy serializing the partial changes then + * acquire the stream lock now and wait for the leader worker to finish + * serializing the changes. Otherwise, the parallel apply worker won't get + * a chance to receive a STREAM_STOP (and acquire the stream lock) until + * the leader had serialized all changes which can lead to undetected + * deadlock. + * + * XXX It is possible that immediately after we have waited for a lock in + * the FS_SERIALIZE_IN_PROGRESS state, the fileset state becomes + * FS_SERIALIZE_DONE but re-checking it again doesn't seem worth it. + * Anyway, next time when this function is invoked, we will set the state + * to FS_READY. + */ + if (fileset_state == FS_SERIALIZE_IN_PROGRESS) + { + pa_lock_stream(MyParallelShared->xid, AccessShareLock); + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); + } + + /* + * We cannot read the file immediately after the leader has serialized all + * changes to the file because there may still be messages in the memory + * queue. We will apply all spooled messages the next time we call this + * function, which should ensure that there are no messages left in the + * memory queue. + */ + else if (fileset_state == FS_SERIALIZE_DONE) + { + pa_set_fileset_state(MyParallelShared, FS_READY); + } + else if (fileset_state == FS_READY) + { + apply_spooled_messages(&MyParallelShared->fileset, + MyParallelShared->xid, + InvalidXLogRecPtr); + pa_set_fileset_state(MyParallelShared, FS_UNKNOWN); + } +} + +/* * Interrupt handler for main loop of parallel apply worker. */ static void @@ -767,6 +856,11 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) /* * Ignore statistics fields that have been updated by the leader * apply worker. + * + * XXX We can avoid sending the statistics fields from the leader + * apply worker but for that, it needs to rebuild the entire + * message by removing these fields which could be more work than + * simply ignoring these fields in the parallel apply worker. */ s.cursor += SIZE_STATS_MESSAGE; @@ -774,16 +868,24 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { - int rc; + /* Check if changes have been serialized to a file. */ + if (pa_has_spooled_message_pending()) + { + pa_spooled_messages(); + } + else + { + int rc; - /* Wait for more work. */ - rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - 1000L, - WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN); + /* Wait for more work. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN); - if (rc & WL_LATCH_SET) - ResetLatch(MyLatch); + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } } else { @@ -1099,8 +1201,11 @@ HandleParallelApplyMessages(void) /* * Send the data to the specified parallel apply worker via shared-memory * queue. + * + * Return false if the attempt to send data via shared memory times out, true + * otherwise. */ -void +bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) { int rc; @@ -1108,6 +1213,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) TimestampTz startTime = 0; Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS) @@ -1117,7 +1223,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true); if (result == SHM_MQ_SUCCESS) - break; + return true; else if (result == SHM_MQ_DETACHED) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1137,23 +1243,48 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) CHECK_FOR_INTERRUPTS(); } - /* - * If the attempt to send data via shared memory times out, we restart - * the logical replication to prevent possible deadlocks with another - * parallel apply worker. Refer to the comments atop this file for - * details. - */ if (startTime == 0) startTime = GetCurrentTimestamp(); else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication parallel apply worker due to timeout"))); + return false; } } /* + * Swtich to PARTIAL_SERIALIZE mode for the current transaction -- this means + * that the current data and any subsequent data for this transaction will be + * serialized to a file. This is done to prevent possible deadlocks with + * another parallel apply worker (refer to the comments atop + * applyparallelworker.c for details). + */ +void +pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, + bool stream_locked) +{ + /* + * The parallel apply worker could be stuck for some reason (say + * waiting on some lock by other backend), so stop trying to send + * data directly to it and instead start to serialize data to + * file instead. + */ + winfo->serialize_changes = true; + + /* Initialize the stream fileset. */ + stream_start_internal(winfo->shared->xid, true); + + /* + * Acquires the stream lock if not already to make sure that the + * parallel apply worker will wait for the leader to release the + * stream lock until the end of the transaction. + */ + if (!stream_locked) + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS); +} + +/* * Wait until the parallel apply worker's transaction state has reached or * exceeded the given xact_state. */ @@ -1406,6 +1537,45 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) } /* + * Set the fileset state for the given parallel apply worker. The fileset + * will be set once the leader worker serialized all changes to the file + * so that it can be used by parallel apply worker. + */ +void +pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state) +{ + SpinLockAcquire(&wshared->mutex); + wshared->fileset_state = fileset_state; + + if (fileset_state == FS_SERIALIZE_DONE) + { + Assert(am_leader_apply_worker()); + Assert(MyLogicalRepWorker->stream_fileset); + wshared->fileset = *MyLogicalRepWorker->stream_fileset; + } + + SpinLockRelease(&wshared->mutex); +} + +/* + * Get the fileset state for the given parallel apply worker. + */ +static PartialFileSetState +pa_get_fileset_state(void) +{ + PartialFileSetState fileset_state; + + Assert(am_parallel_apply_worker()); + + SpinLockAcquire(&MyParallelShared->mutex); + fileset_state = MyParallelShared->fileset_state; + SpinLockRelease(&MyParallelShared->mutex); + + return fileset_state; +} + +/* * Helper functions to acquire and release a lock for each stream block. * * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index dfc58a0..f03e3e7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -256,6 +256,18 @@ typedef struct ApplyErrorCallbackArg * worker. Changes are written to temporary files and then applied when the * final commit arrives. * + * TRANS_LEADER_PARTIAL_SERIALIZE: + * This action means that we are in the leader apply worker and have sent some + * changes directly to the parallel apply worker and the remaining changes are + * serialized to a file, due to timeout while sending data. The parallel apply + * worker will apply these serialized changes when the final commit arrives. + * + * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to + * serializing changes, the leader worker also needs to serialize the + * STREAM_XXX message to a file, and wait for the parallel apply worker to + * finish the transaction when processing the transaction finish command. So + * this new action was introduced to keep the code and logic clear. + * * TRANS_LEADER_SEND_TO_PARALLEL: * This action means that we are in the leader apply worker and need to send * the changes to the parallel apply worker. @@ -271,6 +283,7 @@ typedef enum /* Actions for streaming transactions. */ TRANS_LEADER_SERIALIZE, + TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY } TransApplyAction; @@ -353,7 +366,6 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; static inline void subxact_filename(char *path, Oid subid, TransactionId xid); -static inline void changes_filename(char *path, Oid subid, TransactionId xid); /* * Information about subtransactions of a given toplevel transaction. @@ -366,10 +378,9 @@ static inline void cleanup_subxact_info(void); /* * Serialize and deserialize changes for a toplevel transaction. */ -static void stream_cleanup_files(Oid subid, TransactionId xid); static void stream_open_file(Oid subid, TransactionId xid, bool first_segment); -static void stream_write_change(char action, StringInfo s); +static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -399,9 +410,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); -/* Common streaming function to apply all the spooled messages */ -static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); - /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); static void stop_skipping_changes(void); @@ -564,6 +572,7 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransactionId current_xid; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg; apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -574,6 +583,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) Assert(TransactionIdIsValid(stream_xid)); /* + * The parallel apply worker needs the xid in this message to decide + * whether to define a savepoint, so save the original message that has not + * moved the cursor after the xid. We will serialize this message to a file + * in PARTIAL_SERIALIZE mode. + */ + original_msg = *s; + + /* * We should have received XID of the subxact as the first part of the * message, so extract it. */ @@ -599,14 +616,27 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - pa_send_data(winfo, s->len, s->data); - /* * XXX The publisher side doesn't always send relation/type update * messages after the streaming transaction, so also update the * relation/type in leader apply worker. See function * cleanup_rel_sync_cache. */ + if (pa_send_data(winfo, s->len, s->data)) + return (action != LOGICAL_REP_MSG_RELATION && + action != LOGICAL_REP_MSG_TYPE); + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + pa_switch_to_partial_serialize(winfo, false); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(action, &original_msg); + + /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */ return (action != LOGICAL_REP_MSG_RELATION && action != LOGICAL_REP_MSG_TYPE); @@ -1247,6 +1277,9 @@ apply_handle_stream_prepare(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1271,7 +1304,8 @@ apply_handle_stream_prepare(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, + prepare_data.xid, prepare_data.prepare_lsn); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -1289,14 +1323,42 @@ apply_handle_stream_prepare(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - /* Send STREAM PREPARE message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); + if (pa_send_data(winfo, s->len, s->data)) + { + /* Finish processing the streaming transaction. */ + pa_xact_finish(winfo, prepare_data.end_lsn); + break; + } + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_open_and_write_change(prepare_data.xid, + LOGICAL_REP_MSG_STREAM_PREPARE, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); /* Finish processing the streaming transaction. */ pa_xact_finish(winfo, prepare_data.end_lsn); break; case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before preparing. + */ + if (stream_fd) + stream_close_file(); + begin_replication_step(); /* Mark the transaction as prepared. */ @@ -1359,6 +1421,47 @@ apply_handle_origin(StringInfo s) } /* + * Initialize fileset (if not already done). + * + * Create a new file when first_segment is true, otherwise open the existing + * file. + */ +void +stream_start_internal(TransactionId xid, bool first_segment) +{ + begin_replication_step(); + + /* + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. + */ + if (!MyLogicalRepWorker->stream_fileset) + { + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); + } + + /* Open the spool file for this transaction. */ + stream_open_file(MyLogicalRepWorker->subid, xid, first_segment); + + /* If this is not the first segment, open existing subxact file. */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, xid); + + end_replication_step(); +} + +/* * Handle STREAM START message. */ static void @@ -1368,6 +1471,9 @@ apply_handle_stream_start(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1397,65 +1503,63 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SERIALIZE: /* - * Start a transaction on stream start, this transaction will be - * committed on the stream stop unless it is a tablesync worker in - * which case it will be committed after processing all the - * messages. We need the transaction for handling the buffile, - * used for serializing the streaming data and subxact info. + * Function stream_start_internal starts a transaction. This + * transaction will be committed on the stream stop unless it is a + * tablesync worker in which case it will be committed after + * processing all the messages. We need this transaction for + * handling the BufFile, used for serializing the streaming data + * and subxact info. */ - begin_replication_step(); + stream_start_internal(stream_xid, first_segment); + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); /* - * Initialize the worker's stream_fileset if we haven't yet. This - * will be used for the entire duration of the worker so create it - * in a permanent context. We create this on the very first - * streaming message from any transaction and then use it for this - * and other streaming transactions. Now, we could create a - * fileset at the start of the worker as well but then we won't be - * sure that it will ever be used. + * Once we start serializing the changes, the parallel apply worker + * will wait for the leader to release the stream lock until the + * end of the transaction. So, we don't need to release the lock or + * increment the stream count in that case. */ - if (!MyLogicalRepWorker->stream_fileset) + if (pa_send_data(winfo, s->len, s->data)) { - MemoryContext oldctx; - - oldctx = MemoryContextSwitchTo(ApplyContext); + /* + * Unlock the shared object lock so that the parallel apply + * worker can continue to receive changes. + */ + if (!first_segment) + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); - FileSetInit(MyLogicalRepWorker->stream_fileset); + /* + * Increment the number of streaming blocks waiting to be + * processed by parallel apply worker. + */ + pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); - MemoryContextSwitchTo(oldctx); + /* Cache the parallel apply worker for this transaction. */ + pa_set_stream_apply_worker(winfo); + break; } - /* Open the spool file for this transaction. */ - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); - - /* If this is not the first segment, open existing subxact file. */ - if (!first_segment) - subxact_info_read(MyLogicalRepWorker->subid, stream_xid); - - end_replication_step(); - break; - - case TRANS_LEADER_SEND_TO_PARALLEL: - Assert(winfo); - /* - * Unlock the shared object lock so that the parallel apply worker - * can continue to receive changes. + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. */ - if (!first_segment) - pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + pa_switch_to_partial_serialize(winfo, !first_segment); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: /* - * Increment the number of streaming blocks waiting to be - * processed by parallel apply worker. + * Open the spool file unless it was already opened when switching + * to serialize mode. The transaction started in + * stream_start_internal will be committed on the stream stop. */ - pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); + if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL) + stream_start_internal(stream_xid, first_segment); - pa_send_data(winfo, s->len, s->data); - - /* Cache the parallel apply worker for this transaction. */ - pa_set_stream_apply_worker(winfo); + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); break; case TRANS_PARALLEL_APPLY: @@ -1478,6 +1582,33 @@ apply_handle_stream_start(StringInfo s) } /* + * Update the information about subxacts and close the file. + * + * This function should be called when the stream_start_internal function has + * been called. + */ +void +stream_stop_internal(TransactionId xid) +{ + /* + * Serialize information about subxacts for the toplevel transaction, then + * close the stream messages spool file. + */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + stream_close_file(); + + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle STREAM STOP message. */ static void @@ -1496,24 +1627,7 @@ apply_handle_stream_stop(StringInfo s) switch (apply_action) { case TRANS_LEADER_SERIALIZE: - - /* - * Close the file with serialized changes, and serialize - * information about subxacts for the toplevel transaction. - */ - subxact_info_write(MyLogicalRepWorker->subid, stream_xid); - stream_close_file(); - - /* We must be in a valid transaction state */ - Assert(IsTransactionState()); - - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - - /* Reset per-stream context */ - MemoryContextReset(LogicalStreamingContext); - - pgstat_report_activity(STATE_IDLE, NULL); + stream_stop_internal(stream_xid); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1527,11 +1641,24 @@ apply_handle_stream_stop(StringInfo s) */ pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); - pa_send_data(winfo, s->len, s->data); + if (pa_send_data(winfo, s->len, s->data)) + { + pa_set_stream_apply_worker(NULL); + pgstat_report_activity(STATE_IDLE, NULL); + break; + } - pa_set_stream_apply_worker(NULL); + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + pa_switch_to_partial_serialize(winfo, true); - pgstat_report_activity(STATE_IDLE, NULL); + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s); + stream_stop_internal(stream_xid); + pa_set_stream_apply_worker(NULL); break; case TRANS_PARALLEL_APPLY: @@ -1670,6 +1797,9 @@ apply_handle_stream_abort(StringInfo s) LogicalRepStreamAbortData abort_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; bool toplevel_xact; if (in_streamed_transaction) @@ -1704,12 +1834,6 @@ apply_handle_stream_abort(StringInfo s) Assert(winfo); /* - * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. - */ - pa_unlock_stream(xid, AccessExclusiveLock); - - /* * For the case of aborting the subtransaction, we increment the * number of streaming blocks and take the lock again before * sending the STREAM_ABORT to ensure that the parallel apply @@ -1725,10 +1849,11 @@ apply_handle_stream_abort(StringInfo s) * applyparallelworker.c. */ if (!toplevel_xact) + { + pa_unlock_stream(xid, AccessExclusiveLock); pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); - - /* Send STREAM ABORT message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); + pa_lock_stream(xid, AccessExclusiveLock); + } /* * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't wait for the @@ -1736,11 +1861,52 @@ apply_handle_stream_abort(StringInfo s) * maintain the commit order and won't have the risk of failures * due to transaction dependencies and deadlocks. */ + if (pa_send_data(winfo, s->len, s->data)) + { + if (toplevel_xact) + { + pa_unlock_stream(xid, AccessExclusiveLock); + (void) pa_free_worker(winfo, xid); + } + + break; + } + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Parallel apply worker might have applied some changes, so write + * the STREAM_ABORT message so that it can rollback the + * subtransaction if needed. + */ + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT, + &original_msg); + if (toplevel_xact) + { + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + pa_unlock_stream(xid, AccessExclusiveLock); (void) pa_free_worker(winfo, xid); + } break; case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before aborting. + */ + if (toplevel_xact && stream_fd) + stream_close_file(); + pa_stream_abort(&abort_data); /* @@ -1766,19 +1932,55 @@ apply_handle_stream_abort(StringInfo s) } /* - * Common spoolfile processing. + * Ensure that the passed location is fileset's end. */ static void -apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) +ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, + off_t offset) +{ + char path[MAXPGPATH]; + BufFile *fd; + int last_fileno; + off_t last_offset; + + Assert(!IsTransactionState()); + + begin_replication_step(); + + changes_filename(path, MyLogicalRepWorker->subid, xid); + + fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + + BufFileSeek(fd, 0, 0, SEEK_END); + BufFileTell(fd, &last_fileno, &last_offset); + + BufFileClose(fd); + + end_replication_step(); + + if (last_fileno != fileno || last_offset != offset) + elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"", + path); +} + +/* + * Common spoolfile processing. + */ +void +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn) { StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; MemoryContext oldcxt; - BufFile *fd; + ResourceOwner oldowner; + int fileno; + off_t offset; - maybe_start_skipping_changes(lsn); + if (!am_parallel_apply_worker()) + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -1794,8 +1996,16 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + /* + * Make sure the file is owned by the toplevel transaction so that the file + * will not be accidentally closed when aborting a subtransaction. + */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + + CurrentResourceOwner = oldowner; buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -1826,7 +2036,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) CHECK_FOR_INTERRUPTS(); /* read length of the on-disk record */ - nbytes = BufFileRead(fd, &len, sizeof(len)); + nbytes = BufFileRead(stream_fd, &len, sizeof(len)); /* have we reached end of the file? */ if (nbytes == 0) @@ -1847,12 +2057,14 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) buffer = repalloc(buffer, len); /* and finally read the data into the buffer */ - if (BufFileRead(fd, buffer, len) != len) + if (BufFileRead(stream_fd, buffer, len) != len) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from streaming transaction's changes file \"%s\": %m", path))); + BufFileTell(stream_fd, &fileno, &offset); + /* copy the buffer to the stringinfo and call apply_dispatch */ resetStringInfo(&s2); appendBinaryStringInfo(&s2, buffer, len); @@ -1868,15 +2080,24 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) nchanges++; + /* + * It is possible the file has been closed because we have processed + * the transaction end message like stream_commit in which case that + * must be the last message. + */ + if (!stream_fd) + { + ensure_last_message(stream_fileset, xid, fileno, offset); + break; + } + if (nchanges % 1000 == 0) elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); } - BufFileClose(fd); - - pfree(buffer); - pfree(s2.data); + if (stream_fd) + stream_close_file(); elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); @@ -1895,6 +2116,9 @@ apply_handle_stream_commit(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1913,7 +2137,8 @@ apply_handle_stream_commit(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(xid, commit_data.commit_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, + commit_data.commit_lsn); apply_handle_commit_internal(&commit_data); @@ -1924,14 +2149,41 @@ apply_handle_stream_commit(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - /* Send STREAM COMMIT message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); + if (pa_send_data(winfo, s->len, s->data)) + { + /* Finish processing the streaming transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + } + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); /* Finish processing the streaming transaction. */ pa_xact_finish(winfo, commit_data.end_lsn); break; case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + apply_handle_commit_internal(&commit_data); MyParallelShared->last_commit_end = XactLastCommitEnd; @@ -3854,7 +4106,7 @@ subxact_filename(char *path, Oid subid, TransactionId xid) } /* format filename for file containing serialized changes */ -static inline void +void changes_filename(char *path, Oid subid, TransactionId xid) { snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); @@ -3868,7 +4120,7 @@ changes_filename(char *path, Oid subid, TransactionId xid) * toplevel transaction. Each subscription has a separate set of files * for any toplevel transaction. */ -static void +void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; @@ -3891,9 +4143,6 @@ stream_cleanup_files(Oid subid, TransactionId xid) * by stream_xid (global variable). If it's the first chunk of streamed * changes for this transaction, create the buffile, otherwise open the * previously created file. - * - * This can only be called at the beginning of a "streaming" block, i.e. - * between stream_start/stream_stop messages from the upstream. */ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) @@ -3901,7 +4150,6 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) char path[MAXPGPATH]; MemoryContext oldcxt; - Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); @@ -3940,15 +4188,10 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) /* * stream_close_file * Close the currently open file with streamed changes. - * - * This can only be called at the end of a streaming block, i.e. at stream_stop - * message from the upstream. */ static void stream_close_file(void) { - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); BufFileClose(stream_fd); @@ -3965,13 +4208,11 @@ stream_close_file(void) * the length), action code (identifying the message type) and message * contents (without the subxact TransactionId value). */ -static void +void stream_write_change(char action, StringInfo s) { int len; - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); /* total on-disk size, including the action type character */ @@ -3990,6 +4231,26 @@ stream_write_change(char action, StringInfo s) } /* + * stream_open_and_write_change + * Serialize a message to a file for the given transaction. + * + * This function is similar to stream_write_change except that it will open the + * target file if not already before writing the message and close file at the + * end. + */ +static void +stream_open_and_write_change(TransactionId xid, char action, StringInfo s) +{ + Assert(!in_streamed_transaction); + + if (!stream_fd) + stream_start_internal(xid, false); + + stream_write_change(action, s); + stream_stop_internal(xid); +} + +/* * Cleanup the memory for subxacts and reset the related variables. */ static inline void @@ -4660,7 +4921,8 @@ set_apply_error_context_origin(char *originname) /* * Return the action to take for the given transaction. *winfo is assigned to * the destination parallel worker info (if the action is - * TRANS_LEADER_SEND_TO_PARALLEL), otherwise *winfo is assigned NULL. + * TRANS_LEADER_SEND_TO_PARALLEL or TRANS_LEADER_PARTIAL_SERIALIZE), otherwise + * *winfo is assigned NULL. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) @@ -4682,12 +4944,16 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) */ *winfo = pa_find_worker(xid); - if (*winfo) + if (!*winfo) { - return TRANS_LEADER_SEND_TO_PARALLEL; + return TRANS_LEADER_SERIALIZE; + } + else if ((*winfo)->serialize_changes) + { + return TRANS_LEADER_PARTIAL_SERIALIZE; } else { - return TRANS_LEADER_SERIALIZE; + return TRANS_LEADER_SEND_TO_PARALLEL; } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 4d3a6a1..b06651c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -19,6 +19,7 @@ #include "datatype/timestamp.h" #include "miscadmin.h" #include "replication/logicalrelation.h" +#include "storage/buffile.h" #include "storage/fileset.h" #include "storage/lock.h" #include "storage/shm_mq.h" @@ -95,6 +96,22 @@ typedef enum ParallelTransState } ParallelTransState; /* + * State of fileset in leader apply worker. + * + * FS_SERIALIZE_IN_PROGRESS means that the leader is serializing changes to the + * file. FS_SERIALIZE_DONE means that the leader has serialized all changes to + * the file. FS_READY means that it is now ok for a parallel apply worker read + * the file. + */ +typedef enum PartialFileSetState +{ + FS_UNKNOWN, + FS_SERIALIZE_IN_PROGRESS, + FS_SERIALIZE_DONE, + FS_READY +} PartialFileSetState; + +/* * Struct for sharing information between leader apply worker and parallel * apply workers. */ @@ -129,6 +146,19 @@ typedef struct ParallelApplyWorkerShared * the leader worker so it can update the lsn_mappings. */ XLogRecPtr last_commit_end; + + /* + * After entering PARTIAL_SERIALIZE mode, the leader apply worker will + * serialize changes to the file, and share the fileset with the parallel + * apply worker when processing the transaction finish command. Then the + * parallel apply worker will apply all the spooled messages. + * + * FileSet is used here instead of SharedFileSet because we need it to + * survive after releasing the shared memory so that the leader apply + * worker can re-use the same fileset for the next streaming transaction. + */ + PartialFileSetState fileset_state; + FileSet fileset; } ParallelApplyWorkerShared; /* @@ -147,6 +177,13 @@ typedef struct ParallelApplyWorkerInfo dsm_segment *dsm_seg; /* + * Indicates whether the leader apply worker needs to serialize the + * remaining changes to a file due to timeout when attempting to send data + * to the parallel apply worker via shared memory. + */ + bool serialize_changes; + + /* * True if the worker is being used to process a parallel apply * transaction. False indicates this worker is available for re-use. */ @@ -199,10 +236,21 @@ extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); +extern void stream_start_internal(TransactionId xid, bool first_segment); +extern void stream_stop_internal(TransactionId xid); + +/* Common streaming function to apply all the spooled messages */ +extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn); + extern void apply_dispatch(StringInfo s); extern void maybe_reread_subscription(void); +extern void changes_filename(char *path, Oid subid, TransactionId xid); +extern void stream_write_change(char action, StringInfo s); +extern void stream_cleanup_files(Oid subid, TransactionId xid); + extern void InitializeApplyWorker(void); extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); @@ -218,8 +266,11 @@ extern bool pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid); extern void pa_detach_all_error_mq(void); extern void pa_stop_idle_workers(void); -extern void pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, +extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); +extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, + bool stream_locked); + extern void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *wshared); extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared, @@ -230,6 +281,8 @@ extern void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid); extern void pa_reset_subtrans(void); extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data); +extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state); extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode); extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 42c5c4d..19626fe 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1389,6 +1389,7 @@ LagTracker LargeObjectDesc LastAttnumInfo Latch +PartialFileSetState LerpFunc LexDescr LexemeEntry -- 2.7.2.windows.1