Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Synchronizing slots from primary to standby
Date
Msg-id CAHut+PtJAAPghc4GPt0k=jeMz1qu4H7mnaDifOHsVsMqi-qOLA@mail.gmail.com
Whole thread Raw
In response to RE: Synchronizing slots from primary to standby  ("Zhijie Hou (Fujitsu)" <houzj.fnst@fujitsu.com>)
Responses Re: Synchronizing slots from primary to standby
List pgsql-hackers
Here are some review comments for patch v58-0002

(FYI - I quickly checked with the latest v59-0002 and AFAIK all these
review comments below are still relevant)

======
Commit message

1.
If a logical slot is invalidated on the primary, slot on the standby is also
invalidated.

~

/slot on the standby/then that slot on the standby/

======
doc/src/sgml/logicaldecoding.sgml

2.
In order to resume logical replication after failover from the synced
logical slots, it is required that 'conninfo' in subscriptions are
altered to point to the new primary server using ALTER SUBSCRIPTION
... CONNECTION. It is recommended that subscriptions are first
disabled before promoting the standby and are enabled back once these
are altered as above after failover.

~

Minor rewording mainly to reduce a long sentence.

SUGGESTION
To resume logical replication after failover from the synced logical
slots, the subscription's 'conninfo' must be altered to point to the
new primary server. This is done using ALTER SUBSCRIPTION ...
CONNECTION. It is recommended that subscriptions are first disabled
before promoting the standby and are enabled back after altering the
connection string.

======
doc/src/sgml/system-views.sgml

3.
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>synced</structfield> <type>bool</type>
+      </para>
+      <para>
+      True if this logical slot was synced from a primary server.
+      </para>
+      <para>

SUGGESTION
True if this is a logical slot that was synced from a primary server.

======
src/backend/access/transam/xlogrecovery.c

4.
+ /*
+ * Shutdown the slot sync workers to prevent potential conflicts between
+ * user processes and slotsync workers after a promotion.
+ *
+ * We do not update the 'synced' column from true to false here, as any
+ * failed update could leave some slot's 'synced' column as false. This
+ * could cause issues during slot sync after restarting the server as a
+ * standby. While updating after switching to the new timeline is an
+ * option, it does not simplify the handling for 'synced' column.
+ * Therefore, we retain the 'synced' column as true after promotion as they
+ * can provide useful information about their origin.
+ */

Minor comment wording changes.

BEFORE
...any failed update could leave some slot's 'synced' column as false.
SUGGESTION
...any failed update could leave 'synced' column false for some slots.

~

BEFORE
Therefore, we retain the 'synced' column as true after promotion as
they can provide useful information about their origin.
SUGGESTION
Therefore, we retain the 'synced' column as true after promotion as it
may provide useful information about the slot origin.

======
src/backend/replication/logical/slotsync.c

5.
+ * While creating the slot on physical standby, if the local restart_lsn and/or
+ * local catalog_xmin is ahead of those on the remote then the worker cannot
+ * create the local slot in sync with the primary server because that would
+ * mean moving the local slot backwards and the standby might not have WALs
+ * retained for old LSN. In this case, the worker will mark the slot as
+ * RS_TEMPORARY. Once the primary server catches up, it will move the slot to
+ * RS_PERSISTENT and will perform the sync periodically.

/will move the slot to RS_PERSISTENT/will mark the slot as RS_PERSISTENT/

~~~

6. drop_synced_slots_internal
+/*
+ * Helper function for drop_obsolete_slots()
+ *
+ * Drops synced slot identified by the passed in name.
+ */
+static void
+drop_synced_slots_internal(const char *name, bool nowait)
+{
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotAcquire(name, nowait);
+
+ Assert(MyReplicationSlot->data.synced);
+
+ ReplicationSlotDropAcquired();
+}

IMO you don't need this function. AFAICT it is only called from one
place and does not result in fewer lines of code.

~~~

7. get_local_synced_slots

+ /* Check if it is logical synchronized slot */
+ if (s->in_use && SlotIsLogical(s) && s->data.synced)
+ {
+ local_slots = lappend(local_slots, s);
+ }

Do you need to check SlotIsLogical(s) here? I thought s->data.synced
can never be true for physical slots. I felt you could write this like
blelow:

if (s->in_use s->data.synced)
{
  Assert(SlotIsLogical(s));
  local_slots = lappend(local_slots, s);
}

~~~

8. check_sync_slot_on_remote

+static bool
+check_sync_slot_on_remote(ReplicationSlot *local_slot, List *remote_slots,
+   bool *locally_invalidated)
+{
+ ListCell   *lc;
+
+ foreach(lc, remote_slots)
+ {
+ RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);

I think you can use the new style foreach_ptr list macros here.

~~~

9. drop_obsolete_slots

+drop_obsolete_slots(List *remote_slot_list)
+{
+ List    *local_slots = NIL;
+ ListCell   *lc;
+
+ local_slots = get_local_synced_slots();
+
+ foreach(lc, local_slots)
+ {
+ ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc);

I think you can use the new style foreach_ptr list macros here.

~~~

10. reserve_wal_for_slot

+ Assert(slot != NULL);
+ Assert(slot->data.restart_lsn == InvalidXLogRecPtr);

You can use the macro XLogRecPtrIsInvalid(lot->data.restart_lsn)

~~~

11. update_and_persist_slot

+/*
+ * Update the LSNs and persist the slot for further syncs if the remote
+ * restart_lsn and catalog_xmin have caught up with the local ones. Otherwise,
+ * persist the slot and return.
+ *
+ * Return true if the slot is marked READY, otherwise false.
+ */
+static bool
+update_and_persist_slot(RemoteSlot *remote_slot)

11a.
The comment says "Otherwise, persist the slot and return" but there is
a return false which doesn't seem to persist anything so it seems
contrary to the comment.

~

11b.
"slot is marked READY" -- IIUC the synced states no longer exist in
v58 so this comment maybe should not be referring to READY anymore. Or
maybe there just needs to be more explanation about the difference
between 'synced' and the state you call "READY".

~~~

12. synchronize_one_slot

+ * The slot is created as a temporary slot and stays in same state until the
+ * initialization is complete. The initialization is considered to be completed
+ * once the remote_slot catches up with locally reserved position and local
+ * slot is updated. The slot is then persisted.

I think this comment is related to the "READY" mentioned by
update_and_persist_slot. Still, perhaps the terminology needs to be
made consistent across all these comments -- e.g. "considered to be
completed" versus "READY" versus "sync-ready" etc.

~~~

13.
+ ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
+   remote_slot->two_phase,
+   remote_slot->failover,
+   true);


This review comment is similar to elsewhere in this post. Consider
commenting on the new parameter like "true /* synced */"

~~~

14. synchronize_slots

+ /*
+ * It is possible to get null values for LSN and Xmin if slot is
+ * invalidated on the primary server, so handle accordingly.
+ */
+ remote_slot->confirmed_lsn = !slot_attisnull(tupslot, 3) ?
+ DatumGetLSN(slot_getattr(tupslot, 3, &isnull)) :
+ InvalidXLogRecPtr;
+
+ remote_slot->restart_lsn = !slot_attisnull(tupslot, 4) ?
+ DatumGetLSN(slot_getattr(tupslot, 4, &isnull)) :
+ InvalidXLogRecPtr;
+
+ remote_slot->catalog_xmin = !slot_attisnull(tupslot, 5) ?
+ DatumGetTransactionId(slot_getattr(tupslot, 5, &isnull)) :
+ InvalidTransactionId;

Isn't this the same functionality as the older v51 code that was
written differently? I felt the old style (without ignoring the
'isnull') was more readable.

v51
+ remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(tupslot, 3, &isnull));
+ if (isnull)
+ remote_slot->confirmed_lsn = InvalidXLogRecPtr;

v58
+ remote_slot->confirmed_lsn = !slot_attisnull(tupslot, 3) ?
+ DatumGetLSN(slot_getattr(tupslot, 3, &isnull)) :
+ InvalidXLogRecPtr;

If you prefer a ternary, it might be cleaner to do it like:

Datum d;
...
d = slot_getattr(tupslot, 3, &isnull);
remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
...

~~~

15.
+
+ /* Drop local slots that no longer need to be synced. */
+ drop_obsolete_slots(remote_slot_list);
+
+ /* Now sync the slots locally */
+ foreach(lc, remote_slot_list)
+ {
+ RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);
+
+ some_slot_updated |= synchronize_one_slot(wrconn, remote_slot);
+ }

Here you can use the new list macro like foreach_ptr.

~~~

16. ReplSlotSyncWorkerMain

+ wrconn = walrcv_connect(PrimaryConnInfo, true, false,
+ cluster_name[0] ? cluster_name : "slotsyncworker",
+ &err);
+ if (wrconn == NULL)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the primary server: %s", err));


Typically, I saw other PG code doing "if (!wrconn)" instead of "if
(wrconn == NULL)"


======
src/backend/replication/slotfuncs.c

17. create_physical_replication_slot

  ReplicationSlotCreate(name, false,
    temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-   false);
+   false, false);

IMO passing parameters like "false, false, false" becomes a bit
difficult to understand from the caller's POV so it might be good to
comment on the parameter like:

ReplicationSlotCreate(name, false,
  temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
  false, false /* synced */);

(there are a few other places like this where the same review comment applies)

~~~

18. create_logical_replication_slot

  ReplicationSlotCreate(name, true,
    temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-   failover);
+   failover, false);

Same as above. Maybe comment on the parameter like "false /* synced */"

~~~

19. pg_get_replication_slots

  case RS_INVAL_WAL_REMOVED:
- values[i++] = CStringGetTextDatum("wal_removed");
+ values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT);
  break;

  case RS_INVAL_HORIZON:
- values[i++] = CStringGetTextDatum("rows_removed");
+ values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT);
  break;

  case RS_INVAL_WAL_LEVEL:
- values[i++] = CStringGetTextDatum("wal_level_insufficient");
+ values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT);
  break;

IMO this code and the #defines that it uses can be written and pushed
as an independent patch.

======
src/backend/replication/walsender.c

20. CreateReplicationSlot

  ReplicationSlotCreate(cmd->slotname, false,
    cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-   false, false);
+   false, false, false);

Consider commenting the parameter like "false /* synced */"

~~~

21.
  ReplicationSlotCreate(cmd->slotname, true,
    cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-   two_phase, failover);
+   two_phase, failover, false);

Consider commenting the parameter like "false /* synced */"

======
src/include/replication/slot.h

22.
+/*
+ * The possible values for 'conflict_reason' returned in
+ * pg_get_replication_slots.
+ */
+#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed"
+#define SLOT_INVAL_HORIZON_TEXT     "rows_removed"
+#define SLOT_INVAL_WAL_LEVEL_TEXT   "wal_level_insufficient"

IMO these #defines and also the code in pg_get_replication_slots()
that uses them can be written and pushed as an independent patch.

======
.../t/050_standby_failover_slots_sync.pl

23.
+# Wait for the standby to start sync
+$standby1->start;

But there is no waiting here? Maybe the comment should say like "Start
the standby so that slot syncing can begin"

~~~

24.
+# Wait for the standby to finish sync
+my $offset = -s $standby1->logfile;
+$standby1->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? newly locally created slot \"lsub1_slot\" is
sync-ready now/,
+ $offset);

SUGGESTION
# Wait for the standby to finish slot syncing

~~~

25.
+# Confirm that logical failover slot is created on the standby and is sync
+# ready.
+is($standby1->safe_psql('postgres',
+ q{SELECT failover, synced FROM pg_replication_slots WHERE slot_name
= 'lsub1_slot';}),
+ "t|t",
+ 'logical slot has failover as true and synced as true on standby');

SUGGESTION
# Confirm that the logical failover slot is created on the standby and
is flagged as 'synced'

~~~

26.
+$subscriber1->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_int (a int PRIMARY KEY);
+ ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION;
+]);
+
+$subscriber1->wait_for_subscription_sync;

Add a comment like

# Subscribe to the new table data and wait for it to arrive

~~~

27.
+# Disable hot_standby_feedback temporarily to stop slot sync worker otherwise
+# the concerned testing scenarios here may be interrupted by different error:
+# 'ERROR:  replication slot is active for PID ..'
+
+$standby1->safe_psql('postgres', 'ALTER SYSTEM SET
hot_standby_feedback = off;');
+$standby1->restart;

Remove the blank line.

~~~

28.
+is($standby1->safe_psql('postgres',
+ q{SELECT slot_name FROM pg_replication_slots WHERE slot_name =
'lsub1_slot';}),
+ 'lsub1_slot',
+ 'synced slot retained on the new primary');

There should be some comment like:

SUGGESTION
# Confirm the synced slot 'lsub1_slot' is retained on the new primary

~~~

29.
+# Confirm that data in tab_int replicated on subscriber
+is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
+ "20",
+ 'data replicated from the new primary');

/replicated on subscriber/replicated on the subscriber/


======
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Andy Fan
Date:
Subject: Re: the s_lock_stuck on perform_spin_delay
Next
From: John Naylor
Date:
Subject: Re: Update docs for default value of fdw_tuple_cost