diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 3ac4a4b..cba6661 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2050,21 +2050,6 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
- TWO_PHASE [ boolean ]
-
-
- If true, this logical replication slot supports decoding of two-phase
- commit. With this option, commands related to two-phase commit such as
- PREPARE TRANSACTION, COMMIT PREPARED
- and ROLLBACK PREPARED are decoded and transmitted.
- The transaction will be decoded and transmitted at
- PREPARE TRANSACTION time.
- The default is false.
-
-
-
-
-
RESERVE_WAL [ boolean ]
@@ -2104,6 +2089,21 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
+
+
+ TWO_PHASE [ boolean ]
+
+
+ If true, this logical replication slot supports decoding of two-phase
+ commit. With this option, commands related to two-phase commit such as
+ PREPARE TRANSACTION, COMMIT PREPARED
+ and ROLLBACK PREPARED are decoded and transmitted.
+ The transaction will be decoded and transmitted at
+ PREPARE TRANSACTION time.
+ The default is false.
+
+
+
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 35bce68..f3c6e1f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2741,5 +2741,6 @@ LookupGXactBySubid(Oid subid)
}
}
LWLockRelease(TwoPhaseStateLock);
+
return found;
}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6995a62..3703cf6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1076,6 +1076,8 @@ CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel)
{
StringInfoData cmd;
+ Assert(strstr("two_phase,failover", option));
+
if (!sub->slotname)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1098,8 +1100,8 @@ CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel)
appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
/*
- * The changed option of the slot can't be rolled back: prevent we are in
- * the transaction state.
+ * The changed option of the slot can't be rolled back, so disallow if we
+ * are in a transaction block.
*/
PreventInTransactionBlock(isTopLevel, cmd.data);
@@ -1282,7 +1284,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/*
* Workers may still survive even if the subscription has
* been disabled. They may read the pg_subscription
- * catalog and detect that the twophase parameter is
+ * catalog and detect that the two_phase parameter is
* updated, which causes the assertion failure. Ensure
* workers have already been exited to avoid it.
*/
@@ -1304,7 +1306,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
errmsg("cannot disable two_phase when prepared transactions are present"),
errhint("Resolve these transactions and try again.")));
- /* Change system catalog acoordingly */
+ /* Change system catalog accordingly */
values[Anum_pg_subscription_subtwophasestate - 1] =
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 45744b7..c566d50 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -272,15 +272,15 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
* the subscription, instead of just one.
*/
List *
-logicalrep_workers_find(Oid subid, bool only_running, bool require_lock)
+logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
{
int i;
List *res = NIL;
- if (require_lock)
+ if (acquire_lock)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- else
- Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
@@ -291,7 +291,7 @@ logicalrep_workers_find(Oid subid, bool only_running, bool require_lock)
res = lappend(res, w);
}
- if (require_lock)
+ if (acquire_lock)
LWLockRelease(LogicalRepWorkerLock);
return res;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2f167a2..e75f24b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -842,23 +842,25 @@ ReplicationSlotAlter(const char *name, bool *failover, bool *two_phase)
" on the standby"));
}
- /*
- * Do not allow users to enable failover for temporary slots as we do not
- * support syncing temporary slots to the standby.
- */
- if (failover && *failover &&
- MyReplicationSlot->data.persistency == RS_TEMPORARY)
+ if (failover)
+ {
+ /*
+ * Do not allow users to enable failover for temporary slots as we do not
+ * support syncing temporary slots to the standby.
+ */
+ if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a temporary replication slot"));
- if (failover && MyReplicationSlot->data.failover != *failover)
- {
- SpinLockAcquire(&MyReplicationSlot->mutex);
- MyReplicationSlot->data.failover = *failover;
- SpinLockRelease(&MyReplicationSlot->mutex);
+ if (MyReplicationSlot->data.failover != *failover)
+ {
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->data.failover = *failover;
+ SpinLockRelease(&MyReplicationSlot->mutex);
- update_slot = true;
+ update_slot = true;
+ }
}
if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 04f65e0..af8e958 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1405,56 +1405,42 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
}
+
/*
- * Process extra options given to ALTER_REPLICATION_SLOT.
+ * Change the definition of a replication slot.
*/
static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd,
- bool *failover_given, bool *failover,
- bool *two_phase_given, bool *two_phase)
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
- *failover_given = false;
- *two_phase_given = false;
+ bool failover_given = false;
+ bool two_phase_given = false;
+ bool failover;
+ bool two_phase;
/* Parse options */
foreach_ptr(DefElem, defel, cmd->options)
{
if (strcmp(defel->defname, "failover") == 0)
{
- if (*failover_given)
+ if (failover_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *failover_given = true;
- *failover = defGetBoolean(defel);
+ failover_given = true;
+ failover = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
- if (*two_phase_given)
+ if (two_phase_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *two_phase_given = true;
- *two_phase = defGetBoolean(defel);
+ two_phase_given = true;
+ two_phase = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
-}
-
-/*
- * Change the definition of a replication slot.
- */
-static void
-AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
-{
- bool failover_given;
- bool two_phase_given;
- bool failover;
- bool two_phase;
-
- ParseAlterReplSlotOptions(cmd, &failover_given, &failover,
- &two_phase_given, &two_phase);
ReplicationSlotAlter(cmd->slotname,
failover_given ? &failover : NULL,
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 990f524..9646261 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -241,7 +241,7 @@ extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running,
- bool require_lock);
+ bool acquire_lock);
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 51fa4b9..40e1a07 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -377,7 +377,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
--- We can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase is enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index a3886d7..b64f419 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -256,7 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
\dRs+
--- We can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase is enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 4e8f627..66265c7 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -371,8 +371,8 @@ is($result, qq(2), 'replicated data in subscriber table');
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
###############################
-# Disable the subscription and alter it to two_phase = false,
-# then verify that the altered subscription reflects the two_phase option.
+# Alter the subscription to two_phase = false.
+# Verify that the altered subscription reflects the two_phase option.
###############################
# Alter subscription two_phase to false
@@ -395,7 +395,10 @@ $result = $node_subscriber->safe_psql('postgres',
);
is($result, qq(d), 'two-phase should be disabled');
-# Now do a prepare on the publisher and make sure that it is not replicated.
+###############################
+# Now do a prepare on the publisher.
+# Verify that it is not replicated.
+###############################
$node_publisher->safe_psql(
'postgres', qq{
BEGIN;
@@ -411,7 +414,10 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'should be no prepared transactions on subscriber');
-# Now commit the insert and verify that it is replicated
+###############################
+# Now commit the insert.
+# Verify that it is replicated.
+###############################
$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
# Wait for the subscriber to catchup
@@ -422,7 +428,10 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
is($result, qq(3), 'replicated data in subscriber table');
-# Alter subscription two_phase to true
+###############################
+# Alter the subscription to two_phase = true.
+# Verify that the altered subscription reflects the two_phase option.
+###############################
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
$node_subscriber->poll_query_until('postgres',