Re: Synchronizing slots from primary to standby - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Synchronizing slots from primary to standby
Date
Msg-id CAHut+PsFs=Qyi+qBXixc78Hc6DeKvS2n5+Qpk1Perhq73j+Log@mail.gmail.com
Whole thread Raw
In response to Re: Synchronizing slots from primary to standby  (shveta malik <shveta.malik@gmail.com>)
Responses Re: Synchronizing slots from primary to standby
List pgsql-hackers
Hi Shveta. Here are some comments for patch v14-0002

The patch is large, so my code review is a WIP... more later next week...

======
GENERAL

1. Patch size

The patch is 2700 lines.  Is it possible to break this up into smaller
self-contained parts to make the reviews more manageable?

~~~

2. PG Docs

I guess there are missing PG Docs for this patch. E.g there are new
GUCs added but I see no documentation yet for them.

~

3. Terms

There are variations of what to call the sync worker
- "Slot sync worker" or
- "slot-sync worker" or
- "slot synchronization worker" or
- "slot-synchronization worker"
- and others

These are all in the comments and messages etc. Better to
search/replace to make a consistent term everywhere.

FWIW, I preferred just to call it "slot-sync worker".

~

4. typedefs

I think multiple new typedefs are added by this patch. IIUC, those
should be included in the file typedef.list so the pg_indent will work
properly.

5. max_slot_sync_workers GUC

There is already a 'max_sync_workers_per_subscription', but that one
is for "tablesync" workers. IMO it is potentially confusing now that
both these GUCs have 'sync_workers' in the name. I think it would be
less ambiguous to change your new GUC to 'max_slotsync_workers'.

======
Commit Message

6. Overview?

I felt the information in this commit message is describing details of
what changes are in this patch but there is no synopsis about the
*purpose* of this patch as a whole. Eg. What is it for?

It seemed like there should be some introductory paragraph up-front
before describing all the specifics.

~~~

7.
For slots to be synchronised, another GUC is added:
synchronize_slot_names: This is a runtime modifiable GUC.

~

If this is added by this patch then how come there is some SGML
describing the same GUC in patch 14-0001? What is the relationship?

~~~

8.
Let us say slots mentioned in 'synchronize_slot_names' on primary belongs to
10 DBs and say the new GUC is set at default value of 2, then each worker
will manage 5 dbs and will keep on synching the slots for them.

~

/the new GUC is set at default value of 2/'max_slot_sync_workers' is 2/

~~~

9.
If a new
DB is found by replication launcher, it will assign this new db to
the worker handling the minimum number of dbs currently (or first
worker in case of equal count)

~

Hmm. Isn't this only describing cases where max_slot_workers was
exceeded? Otherwise, you should just launch a brand new sync-worker,
right?

~~~

10.
Each worker slot will have its own dbids list.

~

It seems confusing to say "worker slot" when already talking about
workers and slots. Can you reword that more like "Each slot-sync
worker will have its own dbids list"?

======
src/backend/postmaster/bgworker.c

======
.../libpqwalreceiver/libpqwalreceiver.c

11. libpqrcv_list_db_for_logical_slots

+/*
+ * List DB for logical slots
+ *
+ * It gets the list of unique DBIDs for logical slots mentioned in slot_names
+ * from primary.
+ */
+static List *
+libpqrcv_list_db_for_logical_slots(WalReceiverConn *conn,

Comment needs some minor tweaking.

~~~

12.
+ if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0)
+ {
+ char    *rawname;
+ List    *namelist;
+ ListCell   *lc;
+
+ appendStringInfoChar(&s, ' ');
+ rawname = pstrdup(slot_names);
+ SplitIdentifierString(rawname, ',', &namelist);
+ foreach (lc, namelist)
+ {
+ if (lc != list_head(namelist))
+ appendStringInfoChar(&s, ',');
+ appendStringInfo(&s, "%s",
+ quote_identifier(lfirst(lc)));
+ }
+ }

/rawname/rawnames/

~~~

13.
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not receive list of slots the primary server: %s",
+ pchomp(PQerrorMessage(conn->streamConn)))));
+ }

/the primary server/from the primary server/

~~~

14.
+ if (PQnfields(res) < 1)
+ {
+ int nfields = PQnfields(res);
+
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("Could not get list of slots: got %d fields, "
+ "expected %d or more fields.",
+    nfields, 1)));
+ }

This code seems over-complicated. If it is < 1 then it can only be
zero, right? So then what is the point of calculating and displaying
the 'nfields' which can only be 0?

~~~

15.
+ ntuples = PQntuples(res);
+ for (int i = 0; i < ntuples; i++)
+ {
+
+ slot_data = palloc0(sizeof(WalRecvReplicationSlotDbData));
+ if (!PQgetisnull(res, i, 0))
+ slot_data->database = atooid(PQgetvalue(res, i, 0));
+
+ slot_data->last_sync_time = 0;
+ slotlist = lappend(slotlist, slot_data);
+ }

15a.
Unnecessary blank line in for-block.

~~~

15b.
Unnecessary assignment to 'last_sync_time' because the whole structure
was palloc0 just 2 lines above.

======
src/backend/replication/logical/Makefile

======
src/backend/replication/logical/launcher.c

16.
+/*
+ * Initial and incremental allocation size for dbids array for each
+ * SlotSyncWorker in dynamic shared memory i.e. we start with this size
+ * and once it is exhausted, dbids is rellocated with size incremented
+ * by ALLOC_DB_PER_WORKER
+ */
+#define ALLOC_DB_PER_WORKER 100


I felt it might be simpler to just separate these values instead of
having to describe how you make use of the same constant for 2
meanings

For example,

#define DB_PER_WORKER_ALLOC_INIT 100
#define DB_PER_WORKER_ALLOC_EXTRA 100

~~~

17.
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);

-
 /*

Unnecessary whitespace change.

~~~

18. ApplyLauncherShmemInit

  bool found;
+ bool foundSlotSync;

I think it is simpler to just use the same 'found' variable again.

~~

19. ApplyLauncherShmemInit

+ /* Allocate shared-memory for slot-sync workers pool now */
+ LogicalRepCtx->ss_workers = (SlotSyncWorker *)
+ ShmemInitStruct("Replication slot synchronization workers",
+ mul_size(max_slot_sync_workers, sizeof(SlotSyncWorker)),
+ &foundSlotSync);
+
+ if (!foundSlotSync)
+ {
+ int slot;
+
+ for (slot = 0; slot < max_slot_sync_workers; slot++)
+ {
+ SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[slot];
+
+ memset(worker, 0, sizeof(SlotSyncWorker));
+ }
+ }

Why is the memset in a loop? Can't we just zap the whole ss_workers
array in one go using that same mul_size Size?

SUGGESTION

Size ssw_size = mul_size(max_slot_sync_workers, sizeof(SlotSyncWorker));

if (!found)
  memset(LogicalRepCtx->ss_workers, 0, ssw_size);


======
.../replication/logical/logicalfuncs.c

======
src/backend/replication/logical/meson.build

======
src/backend/replication/logical/slotsync.c

======
src/backend/replication/logical/tablesync.c

======
src/backend/replication/repl_gram.y

======
src/backend/replication/repl_scanner.l

======
src/backend/replication/slot.c

======
src/backend/replication/walsender.c

======
src/backend/storage/lmgr/lwlocknames.txt

======
.../utils/activity/wait_event_names.txt

======
src/backend/utils/misc/guc_tables.c

20.
+ {
+ {"max_slot_sync_workers",
+ PGC_SIGHUP,
+ REPLICATION_STANDBY,
+ gettext_noop("Maximum number of slots synchronization workers "
+ "on a standby."),
+ NULL,
+ },
+ &max_slot_sync_workers,
+ 2, 0, MAX_SLOT_SYNC_WORKER_LIMIT,
+ NULL, NULL, NULL
+ },
+

/slots synchronization/slot synchronization/

OR

/slots synchronization/slot-sync/

======
src/backend/utils/misc/postgresql.conf.sample

21.
+#max_slot_sync_workers = 2 # max number of slot synchronization workers


Should this comment match the guc_tables.c text. E.g should it say
"... on a standby"

======
src/include/commands/subscriptioncmds.h

======
src/include/nodes/replnodes.h

======
src/include/postmaster/bgworker_internals.h

======
src/include/replication/logicallauncher.h

======
src/include/replication/logicalworker.h

======
src/include/replication/slot.h

22.
+
+ /*
+ * Is standby synced slot?
+ */
+ bool synced;
 } ReplicationSlotPersistentData;

Comment is unclear:
- does it mean "has this primary slot been synsc to standby" ?
- does it mean "this is a slot created by a sync-slot worker"?
- something else?

======
src/include/replication/walreceiver.h

23.
+/*
+ * Slot's DBids receiver from remote.
+ */
+typedef struct WalRecvReplicationSlotDbData
+{
+ Oid database;
+ TimestampTz last_sync_time;
+} WalRecvReplicationSlotDbData;
+

Is that comment correct? Or should it be more like "The slot's DBid
received from remote.". Anyway, that comment seems more for the
'database' field only, not a structure-level comment.

~~~

24.
  walrcv_get_conninfo_fn walrcv_get_conninfo;
  walrcv_get_senderinfo_fn walrcv_get_senderinfo;
  walrcv_identify_system_fn walrcv_identify_system;
+ walrcv_list_db_for_logical_slots_fn walrcv_list_db_for_logical_slots;
  walrcv_server_version_fn walrcv_server_version;
  walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
  walrcv_startstreaming_fn walrcv_startstreaming;
This function name doesn't seem consistent with the existing names.
Something like 'walrcv_get_dbinfo_for_logical_slots_fn' might be
better?
======
src/include/replication/worker_internal.h

25.
+typedef struct SlotSyncWorkerWatchSlot
+{
+ NameData slot_name;
+ XLogRecPtr confirmed_lsn;
+ int inactivity_count;
+} SlotSyncWorkerWatchSlot;

I did not find any reference to this typedef except in the following
struct for SlotSyncWorker. So why not just make this a nested
structure within 'SlotSyncWorker' instead?

~~~

26.
+typedef struct SlotSyncWorker
+{
+ /* Time at which this worker was launched. */
+ TimestampTz launch_time;
+
+ /* Indicates if this slot is used or free. */
+ bool in_use;
+
+ /* The slot in worker pool to which it is attached */
+ int slot;
+
+ /* Increased every time the slot is taken by new worker. */
+ uint16 generation;
+
+ /* Pointer to proc array. NULL if not running. */
+ PGPROC    *proc;
+
+ /* User to use for connection (will be same as owner of subscription). */
+ Oid userid;
+
+ /* Database id to connect to. */
+ Oid dbid;
+
+ /* Count of Database ids it manages */
+ uint32 dbcount;
+
+ /* DSA for dbids */
+ dsa_area *dbids_dsa;
+
+ /* dsa_pointer for database ids it manages */
+ dsa_pointer dbids_dp;
+
+ /* Mutex to access dbids in dsa */
+ slock_t         mutex;
+
+ /* Info about slot being monitored for worker's naptime purpose */
+ SlotSyncWorkerWatchSlot monitor;
+} SlotSyncWorker;

There seems an awful lot about this struct which is common with
'LogicalRepWorker' struct.

It seems a shame not to make use of the commonality instead of all the
cut/paste here.

E.g. Can it be rearranged so all these common fields are shared:
- launch_time
- in_use
- slot
- generation
- proc
- userid
- dbid

======
src/include/storage/lwlock.h

27.
  LWTRANCHE_LAUNCHER_HASH,
- LWTRANCHE_FIRST_USER_DEFINED
+ LWTRANCHE_FIRST_USER_DEFINED,
+ LWTRANCHE_SLOT_SYNC_DSA
 } BuiltinTrancheIds;

Isn't 'LWTRANCHE_FIRST_USER_DEFINED' supposed to the be last enum?

======
src/test/recovery/meson.build

======
src/test/recovery/t/051_slot_sync.pl

28.
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;

Wrong copyright date

~~~

29.
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');

29a.
Can't all the subroutines be up-front? Then this can move to be with
the other node initialisation code that comets next.

~

29b.
Add a comment something like # Setup nodes

~~~

30.
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+ my $res = $node_phys_standby->safe_psql(
+ 'postgres', qq(
+ select bool_and(conflicting) from pg_replication_slots;));
+
+ is($res, 't',
+ "Logical slot is reported as conflicting");
+}

Doesn't bool_and() mean returns false if only some but not all slots
are conflicting - is that intentional?> Or is this sub-routine only
expecting to test one slot, in which case maybe the SQL should include
also the 'slot_name'?

~~~

31.
+$node_primary->start;
+$node_primary->psql('postgres', q{SELECT
pg_create_physical_replication_slot('pslot1');});
+
+$node_primary->backup('backup');
+
+$node_phys_standby->init_from_backup($node_primary, 'backup',
has_streaming => 1);
+$node_phys_standby->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+primary_slot_name = 'pslot1'
+hot_standby_feedback = off
+});
+$node_phys_standby->start;
+
+$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)");

The comments seem mostly to describe details about what are the
expectations at each test step.

IMO there also needs to be a larger "overview" comment to describe
more generally *what* this is testing, and *how* it is testing it.
e.g. it is hard to understand the test without being already familiar
with the patch.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: "Ryo Matsumura (Fujitsu)"
Date:
Subject: PATCH: document for regression test forgets libpq test
Next
From: David Geier
Date:
Subject: Re: Eliminate redundant tuple visibility check in vacuum