Re: Time delayed LR (WAS Re: logical replication restrictions) - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Time delayed LR (WAS Re: logical replication restrictions) |
Date | |
Msg-id | CAHut+Punm+_Vq7aosEuDj17Kny9Td636HE7R2-LaDZ2+FCrGpg@mail.gmail.com Whole thread Raw |
In response to | RE: Time delayed LR (WAS Re: logical replication restrictions) ("Takamichi Osumi (Fujitsu)" <osumi.takamichi@fujitsu.com>) |
List | pgsql-hackers |
Here are my review comments for the v34 patch. ====== src/backend/replication/logical/worker.c +/* The last time we send a feedback message */ +static TimestampTz send_time = 0; + IMO this is a bad variable name. When this variable was changed to be global it ought to have been renamed. The name "send_time" is almost meaningless without any contextual information. But also it's bad because this global name is "shadowed" by several other parameters and other local variables using that same name (e.g. see UpdateWorkerStats, LogicalRepApplyLoop, etc). It is too confusing. How about using a unique/meaningful name with a comment to match to improve readability and remove unwanted shadowing? SUGGESTION /* Timestamp of when the last feedback message was sent. */ static TimestampTz last_sent_feedback_ts = 0; ~~~ 2. maybe_apply_delay + /* Apply the delay by the latch mechanism */ + do + { + TimestampTz delayUntil; + long diffms; + + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* This might change wal_receiver_status_interval */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * Before calculating the time duration, reload the catalog if needed. + */ + if (!in_remote_transaction && !in_streamed_transaction) + { + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + delayUntil = TimestampTzPlusMilliseconds(finish_ts, MySubscription->minapplydelay); + diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), delayUntil); + + /* + * Exit without arming the latch if it's already past time to apply + * this transaction. + */ + if (diffms <= 0) + break; + + elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = %d ms, remaining wait time: %ld ms", + xid, MySubscription->minapplydelay, diffms); + + /* + * Call send_feedback() to prevent the publisher from exiting by + * timeout during the delay, when the status interval is greater than + * zero. + */ + if (!status_interval_ms) + { + TimestampTz nextFeedback; + + /* + * Based on the last time when we send a feedback message, adjust + * the first delay time for this transaction. This ensures that + * the first feedback message follows wal_receiver_status_interval + * interval. + */ + nextFeedback = TimestampTzPlusMilliseconds(send_time, + wal_receiver_status_interval * 1000L); + status_interval_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), nextFeedback); + } + else + status_interval_ms = wal_receiver_status_interval * 1000L; + + if (status_interval_ms > 0 && diffms > status_interval_ms) + { + WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + status_interval_ms, + WAIT_EVENT_LOGICAL_APPLY_DELAY); + send_feedback(last_received, true, false, true); + } + else + WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + diffms, + WAIT_EVENT_LOGICAL_APPLY_DELAY); + + } while (true); ~ IMO this logic has been tweaked too many times without revisiting the variable names and logic from scratch, so it has become over-complex - some variable names are assuming multiple meanings - multiple * 1000L have crept back in again - the 'diffms' is too generic now with so many vars so it has lost its meaning - GetCurrentTimestamp call in multiple places SUGGESTIONS - rename some variables and simplify the logic. - reduce all the if/else - don't be sneaky with the meaning of status_interval_ms - 'diffms' --> 'remaining_delay_ms' - 'DelayUntil' --> 'delay_until_ts' - introduce 'now' variable - simplify the check of (next_feedback_due_ms < remaining_delay_ms) SUGGESTION (WFM) /* Apply the delay by the latch mechanism */ while (true) { TimestampTz now; TimestampTz delay_until_ts; long remaining_delay_ms; long status_interval_ms; ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); /* This might change wal_receiver_status_interval */ if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); } /* * Before calculating the time duration, reload the catalog if needed. */ if (!in_remote_transaction && !in_streamed_transaction) { AcceptInvalidationMessages(); maybe_reread_subscription(); } now = GetCurrentTimestamp(); delay_until_ts = TimestampTzPlusMilliseconds(finish_ts, MySubscription->minapplydelay); remaining_delay_ms = TimestampDifferenceMilliseconds(now, delay_until_ts); /* * Exit without arming the latch if it's already past time to apply * this transaction. */ if (remaining_delay_ms <= 0) break; elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = %d ms, remaining wait time: %ld ms", xid, MySubscription->minapplydelay, remaining_delay_ms); /* * If a status interval is defined then we may need to call send_feedback() * early to prevent the publisher from exiting during a long apply delay. */ status_interval_ms = wal_receiver_status_interval * 1000L; if (status_interval_ms > 0) { TimestampTz next_feedback_due_ts; long next_feedback_due_ms; /* * Find if the next feedback is due earlier than the remaining delay ms. */ next_feedback_due_ts = TimestampTzPlusMilliseconds(send_time, status_interval_ms); next_feedback_due_ms = TimestampDifferenceMilliseconds(now, next_feedback_due_ts); if (next_feedback_due_ms < remaining_delay_ms) { /* delay before feedback */ WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, next_feedback_due_ms, WAIT_EVENT_LOGICAL_APPLY_DELAY); send_feedback(last_received, true, false, true); continue; } } /* delay before apply */ WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, remaining_delay_ms, WAIT_EVENT_LOGICAL_APPLY_DELAY); } ====== src/include/utils/wait_event.h 3. @@ -149,7 +149,8 @@ typedef enum WAIT_EVENT_REGISTER_SYNC_REQUEST, WAIT_EVENT_SPIN_DELAY, WAIT_EVENT_VACUUM_DELAY, - WAIT_EVENT_VACUUM_TRUNCATE + WAIT_EVENT_VACUUM_TRUNCATE, + WAIT_EVENT_LOGICAL_APPLY_DELAY } WaitEventTimeout; FYI - The PGDOCS has a section with "Table 28.13. Wait Events of Type Timeout" so if you a going to add a new Timeout Event then you also need to document it (alphabetically) in that table. ------ Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: