Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Synchronizing slots from primary to standby
Date
Msg-id CAHut+Psf9z132WNgy0Gr10ZTnonpNjvTBj74wG8kSxXU4rOD7g@mail.gmail.com
Whole thread Raw
In response to RE: Synchronizing slots from primary to standby  ("Zhijie Hou (Fujitsu)" <houzj.fnst@fujitsu.com>)
Responses Re: Synchronizing slots from primary to standby
RE: Synchronizing slots from primary to standby
List pgsql-hackers
Here are some review comments for v44-0001

======
src/backend/replication/slot.c


1. ReplicationSlotCreate

  *     during getting changes, if the two_phase option is enabled it can skip
  *     prepare because by that time start decoding point has been moved. So the
  *     user will only get commit prepared.
+ * failover: Allows the slot to be synced to physical standbys so that logical
+ *     replication can be resumed after failover.
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,

~

/Allows the slot.../If enabled, allows the slot.../

======

2. validate_standby_slots

+validate_standby_slots(char **newval)
+{
+ char    *rawname;
+ List    *elemlist;
+ ListCell   *lc;
+ bool ok = true;
+
+ /* Need a modifiable copy of string */
+ rawname = pstrdup(*newval);
+
+ /* Verify syntax and parse string into list of identifiers */
+ if (!(ok = SplitIdentifierString(rawname, ',', &elemlist)))
+ GUC_check_errdetail("List syntax is invalid.");
+
+ /*
+ * If there is a syntax error in the name or if the replication slots'
+ * data is not initialized yet (i.e., we are in the startup process), skip
+ * the slot verification.
+ */
+ if (!ok || !ReplicationSlotCtl)
+ {
+ pfree(rawname);
+ list_free(elemlist);
+ return ok;
+ }


2a.
You don't need to initialize 'ok' during declaration because it is
assigned immediately anyway.

~

2b.
AFAIK assignment within a conditional like this is not a normal PG
coding style unless there is no other way to do it.

~

2c.
/into list/into a list/

SUGGESTION
/* Verify syntax and parse string into a list of identifiers */
ok = SplitIdentifierString(rawname, ',', &elemlist);
if (!ok)
  GUC_check_errdetail("List syntax is invalid.");


~~~

3. assign_standby_slot_names

+ if (!SplitIdentifierString(standby_slot_names_cpy, ',', &standby_slots))
+ {
+ /* This should not happen if GUC checked check_standby_slot_names. */
+ elog(ERROR, "list syntax is invalid");
+ }

This error here and in validate_standby_slots() are different --
"list" versus "List".

======
src/backend/replication/walsender.c


4. WalSndFilterStandbySlots


+ foreach(lc, standby_slots_cpy)
+ {
+ char    *name = lfirst(lc);
+ XLogRecPtr restart_lsn = InvalidXLogRecPtr;
+ bool invalidated = false;
+ char    *warningfmt = NULL;
+ ReplicationSlot *slot;
+
+ slot = SearchNamedReplicationSlot(name, true);
+
+ if (slot && SlotIsPhysical(slot))
+ {
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ SpinLockRelease(&slot->mutex);
+ }
+
+ /* Continue if the current slot hasn't caught up. */
+ if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) &&
+ restart_lsn < wait_for_lsn)
+ {
+ /* Log warning if no active_pid for this physical slot */
+ if (slot->active_pid == 0)
+ ereport(WARNING,
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" does
not have active_pid",
+    name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the "
+   "standby associated with \"%s\"", name),
+ errhint("Consider starting standby associated with "
+ "\"%s\" or amend standby_slot_names", name));
+
+ continue;
+ }
+ else if (!slot)
+ {
+ /*
+ * It may happen that the slot specified in standby_slot_names GUC
+ * value is dropped, so let's skip over it.
+ */
+ warningfmt = _("replication slot \"%s\" specified in parameter
\"%s\" does not exist, ignoring");
+ }
+ else if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in standby_slot_names, issue
+ * a WARNING and skip it. Although logical slots are disallowed in
+ * the GUC check_hook(validate_standby_slots), it is still
+ * possible for a user to drop an existing physical slot and
+ * recreate a logical slot with the same name. Since it is
+ * harmless, a WARNING should be enough, no need to error-out.
+ */
+ warningfmt = _("cannot have logical replication slot \"%s\" in
parameter \"%s\", ignoring");
+ }
+ else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated)
+ {
+ /*
+ * Specified physical slot may have been invalidated, so there is no point
+ * in waiting for it.
+ */
+ warningfmt = _("physical slot \"%s\" specified in parameter \"%s\"
has been invalidated, ignoring");
+ }
+ else
+ {
+ Assert(restart_lsn >= wait_for_lsn);
+ }

This if/else chain seems structured awkwardly. IMO it would be tidier
to eliminate the NULL slot and IsLogicalSlot up-front, which would
also simplify some of the subsequent conditions

SUGGESTION

slot = SearchNamedReplicationSlot(name, true);

if (!slot)
{
...
}
else if (SlotIsLogical(slot))
{
...
}
else
{
  Assert(SlotIsPhysical(slot))

  SpinLockAcquire(&slot->mutex);
  restart_lsn = slot->data.restart_lsn;
  invalidated = slot->data.invalidated != RS_INVAL_NONE;
  SpinLockRelease(&slot->mutex);

  if (XLogRecPtrIsInvalid(restart_lsn) || invalidated)
  {
  ...
  }
  else if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) &&
restart_lsn < wait_for_lsn)
  {
  ...
  }
  else
  {
    Assert(restart_lsn >= wait_for_lsn);
  }
}

~~~~

5. WalSndWaitForWal

+ else
+ {
+ /* already caught up and doesn't need to wait for standby_slots */
  break;
+ }

/Already/already/

======
src/test/recovery/t/050_standby_failover_slots_sync.pl


6.
+$subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab_int (a int PRIMARY KEY);");
+
+# Create a subscription with failover = true
+$subscriber1->safe_psql('postgres',
+     "CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' "
+   . "PUBLICATION regress_mypub WITH (slot_name = lsub1_slot,
failover = true);"
+);


Consider combining these DDL statements.

~~~

7.
+$subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab_int (a int PRIMARY KEY);");
+$subscriber2->safe_psql('postgres',
+     "CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' "
+   . "PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);");

Consider combining these DDL statements

~~~

8.
+# Stop the standby associated with specified physical replication slot so that
+# the logical replication slot won't receive changes until the standby slot's
+# restart_lsn is advanced or the slots is removed from the standby_slot_names
+# list
+$publisher->safe_psql('postgres', "TRUNCATE tab_int;");
+$publisher->wait_for_catchup('regress_mysub1');
+$standby1->stop;

/with specified/with the specified/

/or the slots is/or the slot is/

~~~

9.
+# Create some data on primary

/on primary/on the primary/

~~~

10.
+$result =
+  $subscriber1->safe_psql('postgres', "SELECT count(*) = 10 FROM tab_int;");
+is($result, 't',
+ "subscriber1 doesn't get data as the sb1_slot doesn't catch up");


I felt instead of checking for 10 maybe it's more consistent with the
previous code to assign again that $primary_row_count variable to 20;

Then check that those primary rows are not all yet received like:

SELECT count(*) < $primary_row_count FROM tab_int;

~~~

11.
+# Now that the standby lsn has advanced, primary must send the decoded
+# changes to the subscription.
+$publisher->wait_for_catchup('regress_mysub1');
+$result =
+  $subscriber1->safe_psql('postgres', "SELECT count(*) = 20 FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 is removed from
the standby_slot_names list"
+);

/primary must/the primary must/

(continuing the suggestion from the previous review comment)

Now this SQL can use the variable too:

subscriber1->safe_psql('postgres', "SELECT count(*) =
$primary_row_count FROM tab_int;");

~~~

12.
+
+# Create another subscription enabling failover
+$subscriber1->safe_psql('postgres',
+     "CREATE SUBSCRIPTION regress_mysub3 CONNECTION '$publisher_connstr' "
+   . "PUBLICATION regress_mypub WITH (slot_name = lsub3_slot,
copy_data=false, failover = true, create_slot = false);"
+);


Maybe give some more information in that comment:

SUGGESTION
Create another subscription (using the same slot created above) that
enables failover.

======
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: [PoC] Improve dead tuple storage for lazy vacuum
Next
From: "Drouvot, Bertrand"
Date:
Subject: Re: Synchronizing slots from primary to standby