RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | shiy.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OSZPR01MB6310269420CA5588D6B81518FD6D9@OSZPR01MB6310.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>) |
List | pgsql-hackers |
On Wed, Aug 17, 2022 2:28 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com> wrote: > > On Tues, August 16, 2022 15:33 PM I wrote: > > Attach the new patches. > > I found that cfbot has a failure. > After investigation, I think it is because the worker's exit state is not set > correctly. So I made some slight modifications. > > Attach the new patches. > Thanks for updating the patch. Here are some comments. 0003 patch ============== 1. src/backend/replication/logical/applybgworker.c + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot replicate target relation \"%s.%s\" using " + "subscription parameter streaming=parallel", + rel->remoterel.nspname, rel->remoterel.relname), + errdetail("The unique column on subscriber is not the unique " + "column on publisher or there is at least one " + "non-immutable function."), + errhint("Please change to use subscription parameter " + "streaming=on."))); Should we use "%s" instead of "streaming=parallel" and "streaming=on"? 2. src/backend/replication/logical/applybgworker.c + * If any worker is handling the streaming transaction, this check needs to + * be performed not only using the apply background worker, but also in the + * main apply worker. This is because without these restrictions, main this check needs to be performed not only using the apply background worker, but also in the main apply worker. -> this check not only needs to be performed by apply background worker, but also by the main apply worker 3. src/backend/replication/logical/relation.c + if (ukey) + { + i = -1; + while ((i = bms_next_member(ukey, i)) >= 0) + { + attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber); + + if (entry->attrmap->attnums[attnum] < 0 || + !bms_is_member(entry->attrmap->attnums[attnum], entry->remoterel.attunique)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + + bms_free(ukey); It looks we need to call bms_free() before return, right? 4. src/backend/replication/logical/relation.c + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; Would it be better to change the comment to: We don't check dropped or generated attributes 5. src/test/subscription/t/032_streaming_apply.pl +$node_publisher->wait_for_catchup($appname); + +# Then we check the foreign key on partition table. +$node_publisher->wait_for_catchup($appname); Here, wait_for_catchup() is called twice, we can remove the second one. 6. src/backend/replication/logical/applybgworker.c + /* If any workers (or the postmaster) have died, we have failed. */ + if (status == APPLY_BGWORKER_EXIT) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("background worker %u failed to apply transaction %u", + entry->wstate->shared->worker_id, + entry->wstate->shared->stream_xid))); Should we change the error message to "apply background worker %u failed to apply transaction %u" ? To be consistent with the error message in apply_bgworker_wait_for(). 0004 patch ============== 1. I saw that the commit message says: If the subscriber exits with an error, this flag will be set true, and whenever the transaction is applied successfully, this flag is reset false. "subretry" is set to false if a transaction is applied successfully, it looks similar to what clear_subscription_skip_lsn() does, so maybe we should remove the following change in apply_handle_stream_abort()? Or only call set_subscription_retry() when rollbacking the toplevel transaction. @@ -1671,6 +1688,9 @@ apply_handle_stream_abort(StringInfo s) */ serialize_stream_abort(xid, subxid); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } reset_apply_error_context_info(); 2. src/backend/replication/logical/worker.c + /* Reset subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; /* Reset subretry */ -> /* Set subretry */ 3. +# Insert dependent data on the publisher, now it works. +$node_subscriber->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); In the case that the DELETE change from publisher has not been applied yet when executing the INSERT, the INSERT will fail. 0005 patch ============== 1. + <para> + Process ID of the main apply worker, if this process is a apply + background worker. NULL if this process is a main apply worker or a + synchronization worker. + </para></entry> a apply background worker -> an apply background worker Regards, Shi yu
pgsql-hackers by date: