Thread: Re: Rework LogicalOutputPluginWriterUpdateProgress

Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Andres Freund
Date:
Hi,

This is a reply to:
https://www.postgresql.org/message-id/CAA4eK1%2BDB66cYRRVyGcaMm7%2BtQ_u%3Dq%3D%2BHWGjpu9X0pqMFWbsZQ%40mail.gmail.com
split off, so patches to address some of my concerns don't confuse cfbot.


On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:
> On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:

> > Attached is a current, quite rough, prototype. It addresses some of the points
> > raised, but far from all. There's also several XXXs/FIXMEs in it.  I changed
> > the file-ending to .txt to avoid hijacking the CF entry.
> >
> 
> I have started a separate thread to avoid such confusion. I hope that
> is fine with you.

In abstract, yes - unfortunately just changing the subject isn't going to
suffice, I'm afraid. The In-Reply-To header was still referencing the old
thread.  The mail archive did see the threads as one, and I think that's what
cfbot uses as the source.


On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:
> On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:
> >
> > Hacking on a rough prototype how I think this should rather look, I had a few
> > questions / remarks:
> >
> > - We probably need to call UpdateProgress from a bunch of places in decode.c
> >   as well? Indicating that we're lagging by a lot, just because all
> >   transactions were in another database seems decidedly suboptimal.
> >
> 
> We can do that but I think in all those cases we will reach quickly
> enough back to walsender logic (WalSndLoop - that will send keepalive
> if required) that we don't need to worry. After processing each
> record, the logic will return back to the main loop that will send
> keepalive if required.

For keepalive processing yes, for syncrep and accurate lag tracking, I don't
think that suffices?  We could do that in WalSndLoop() instead I guess, but
we'd have more information about when that's useful in decode.c.


> Also, while reading WAL if we need to block, it will call WalSndWaitForWal()
> which will send keepalive if required.

The fast-path prevents WalSndWaitForWal() from doing that in a lot of cases.

    /*
     * Fast path to avoid acquiring the spinlock in case we already know we
     * have enough WAL available. This is particularly interesting if we're
     * far behind.
     */
    if (RecentFlushPtr != InvalidXLogRecPtr &&
        loc <= RecentFlushPtr)
        return RecentFlushPtr;


> The patch calls update_progress in change_cb_wrapper and other
> wrappers which will miss the case of DDLs that generates a lot of data
> that is not processed by the plugin. I think for that we either need
> to call update_progress from reorderbuffer.c similar to what the patch
> has removed or we need some other way to address it. Do you have any
> better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
  if (!RelationIsLogicallyLogged(relation))
  if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.


I think, independent of the update_progress calls, it'd be worth investing a
bit of time into optimizing those cases, so that we don't put the changes into
the reorderbuffer in the first place.  I think we find space for two flag bits
to identify the cases in the WAL, rather than needing to access the catalog to
figure it out.  If we don't find space, we could add an annotation the WAL
record (making it bigger) for the two cases, because they're not the path most
important to optimize.



> > - Why should lag tracking only be updated at commit like points? That seems
> >   like it adds odd discontinuinities?
> >
> 
> We have previously experimented to call it from non-commit locations
> but that turned out to give inaccurate information about Lag. See
> email [1].

That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
reporting something more frequently.  ISTM that
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
update lag reporting for records that don't strictly need it. I think that
decision should be made based on the LSN, and be deterministic.


> > - Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
> >   WalSndWriteData() missing wal_sender_timeout <= 0 checks?
> >
> 
> It seems we are checking that via
> ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
> need to check it before as well?

Either we don't need the precheck at all, or we should do it reliably. Right
now we'll have a higher overhead / some behavioural changes, if
wal_sender_timeout is disabled. That doesn't make sense.


> > - I don't really understand why f95d53edged55 added !end_xact to the if
> >   condition for ProcessPendingWrites(). Is the theory that we'll end up in an
> >   outer loop soon?
> >
> 
> Yes. For non-empty xacts, we will anyway send a commit message. For
> empty (skipped) xacts, we will send for synchronous replication case
> to avoid any delay.

That seems way too dependent on the behaviour of a specific output plugin,
there's plenty use cases where you'd not need a separate message emitted at
commit time. With what I proposed we would know whether we just wrote
something, or not.


> > > > I don't think the syncrep logic in WalSndUpdateProgress really works as-is -
> > > > consider what happens if e.g. the origin filter filters out entire
> > > > transactions. We'll afaics never get to WalSndUpdateProgress(). In some cases
> > > > we'll be lucky because we'll return quickly to XLogSendLogical(), but not
> > > > reliably.
> > >
> 
> Which case are you worried about? As mentioned in one of the previous
> points I thought the timeout/keepalive handling in the callers should
> be enough.

Well, you added syncrep specific logic to WalSndUpdateProgress(). The same
logic isn't present in the higher level loops. If we do need that logic, we
also need to trigger it if the origin filter filters out the entire
transaction. If we don't need it, then we shouldn't have it in
WalSndUpdateProgress() either.


> How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
> WalSndWaitToSendPendingWrites?

I don't like those much:

We're not really waiting for the data to be sent or such, we just want to give
it to the kernel to be sent out. Contrast that to WalSndWaitForWal, where we
actually are waiting for something to complete.

I don't think 'write' is a great description either, although our existing
terminology is somewhat muddled. We're waiting calling pq_flush() until
!pq_is_send_pending().

WalSndSendPending() or WalSndFlushPending()?

Greetings,

Andres Freund



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Andres Freund
Date:
Hi,

Replying on the new thread. Original message at
https://www.postgresql.org/message-id/CAA4eK1%2BH2m95HhzfpRkwv2-GtFwtbcVp7837X49%2Bvs0RXX3dBA%40mail.gmail.com


On 2023-02-09 15:54:19 +0530, Amit Kapila wrote:
> One thing to note about the changes we are discussing here is that
> some of the plugins like wal2json already call
> OutputPluginUpdateProgress in their commit callback. They may need to
> update it accordingly.

It was a fundamental mistake to add OutputPluginUpdateProgress(). I don't like
causing unnecessary breakage, but this seems necessary.


> One difference I see with the patch is that I think we will end up
> sending keepalive for empty prepared transactions even though we don't
> skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

Likely WalSndUpdateProgress() should not do anything if
  did_write && !finished_xact.


> The reason why we don't skip sending prepare for empty 2PC xacts is that if
> the WALSender restarts after the PREPARE of a transaction and before the
> COMMIT PREPARED of the same transaction then we won't be able to figure out
> if we have skipped sending BEGIN/PREPARE of a transaction.

It's probably not a good idea to skip sending 2PC state changes anyway, at
least when used for replication, rather than CDC type use cases.

But I again think that that's not something the core system can assume.

I'm sad that we went so far down a pretty obviously bad rabbit hole.  Adding
incrementally more of the progress calls to pgoutput, and knowing that
wal2json also added some, should have run some pretty large alarm bells.


> To skip sending prepare for empty xacts, we previously thought of some ideas
> like (a) At commit-prepare time have a check on the subscriber-side to know
> whether there is a corresponding prepare for it before actually doing
> commit-prepare but that sounded costly. (b) somehow persist the information
> whether the PREPARE for a xact is already sent and then use that information
> for commit prepared but again that also didn't sound like a good idea.

I don't think it's worth optimizing this. However, the explanation for why
we're not skipping empty prepared xacts needs to be added to
pgoutput_prepare_txn() etc.

Greetings,

Andres Freund



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Amit Kapila
Date:
On Sat, Feb 11, 2023 at 3:04 AM Andres Freund <andres@anarazel.de> wrote:
>
> > One difference I see with the patch is that I think we will end up
> > sending keepalive for empty prepared transactions even though we don't
> > skip sending begin/prepare messages for those.
>
> With the proposed approach we reliably know whether a callback wrote
> something, so we can tune the behaviour here fairly easily.
>

I would like to clarify a few things about the proposed approach. In
commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
ctx->did_write = false;, then call the commit/prepare callback (which
will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
update_progress() which will make decisions based on ctx->did_write
flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
should know that the transaction has performed some writes before that
call which is currently working because pgoutput is tracking the same
via sent_begin_txn. Is the intention here that we still track whether
BEGIN () has been sent via pgoutput?

-- 
With Regards,
Amit Kapila.



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Amit Kapila
Date:
On Sat, Feb 11, 2023 at 2:34 AM Andres Freund <andres@anarazel.de> wrote:
>
> On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:
> > On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:
> > >
> > > Hacking on a rough prototype how I think this should rather look, I had a few
> > > questions / remarks:
> > >
> > > - We probably need to call UpdateProgress from a bunch of places in decode.c
> > >   as well? Indicating that we're lagging by a lot, just because all
> > >   transactions were in another database seems decidedly suboptimal.
> > >
> >
> > We can do that but I think in all those cases we will reach quickly
> > enough back to walsender logic (WalSndLoop - that will send keepalive
> > if required) that we don't need to worry. After processing each
> > record, the logic will return back to the main loop that will send
> > keepalive if required.
>
> For keepalive processing yes, for syncrep and accurate lag tracking, I don't
> think that suffices?  We could do that in WalSndLoop() instead I guess, but
> we'd have more information about when that's useful in decode.c.
>

Yeah, I think one possibility to address that is to call
update_progress() in DecodeCommit() and friends when we need to skip
the xact. We decide that in DecodeTXNNeedSkip. In the checks in that
function, I am not sure whether we need to call it for the case where
we skip the xact because we decide that it was previously decoded.

>
> > The patch calls update_progress in change_cb_wrapper and other
> > wrappers which will miss the case of DDLs that generates a lot of data
> > that is not processed by the plugin. I think for that we either need
> > to call update_progress from reorderbuffer.c similar to what the patch
> > has removed or we need some other way to address it. Do you have any
> > better idea?
>
> I don't mind calling something like update_progress() in the specific cases
> that's needed, but I think those are just the
>   if (!RelationIsLogicallyLogged(relation))
>   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
>
> To me it makes a lot more sense to call update_progress() for those, rather
> than generally.
>

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

>
> I think, independent of the update_progress calls, it'd be worth investing a
> bit of time into optimizing those cases, so that we don't put the changes into
> the reorderbuffer in the first place.  I think we find space for two flag bits
> to identify the cases in the WAL, rather than needing to access the catalog to
> figure it out.  If we don't find space, we could add an annotation the WAL
> record (making it bigger) for the two cases, because they're not the path most
> important to optimize.
>
>
>
> > > - Why should lag tracking only be updated at commit like points? That seems
> > >   like it adds odd discontinuinities?
> > >
> >
> > We have previously experimented to call it from non-commit locations
> > but that turned out to give inaccurate information about Lag. See
> > email [1].
>
> That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
> reporting something more frequently.  ISTM that
> WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
> update lag reporting for records that don't strictly need it. I think that
> decision should be made based on the LSN, and be deterministic.
>
>
> > > - Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
> > >   WalSndWriteData() missing wal_sender_timeout <= 0 checks?
> > >
> >
> > It seems we are checking that via
> > ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
> > need to check it before as well?
>
> Either we don't need the precheck at all, or we should do it reliably. Right
> now we'll have a higher overhead / some behavioural changes, if
> wal_sender_timeout is disabled. That doesn't make sense.
>

Fair enough, we can probably do it earlier.

>
> > How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
> > WalSndWaitToSendPendingWrites?
>
> I don't like those much:
>
> We're not really waiting for the data to be sent or such, we just want to give
> it to the kernel to be sent out. Contrast that to WalSndWaitForWal, where we
> actually are waiting for something to complete.
>
> I don't think 'write' is a great description either, although our existing
> terminology is somewhat muddled. We're waiting calling pq_flush() until
> !pq_is_send_pending().
>
> WalSndSendPending() or WalSndFlushPending()?
>

Either of those sounds fine.

-- 
With Regards,
Amit Kapila.



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Andres Freund
Date:
Hi,

On 2023-02-13 08:22:34 +0530, Amit Kapila wrote:
> On Sat, Feb 11, 2023 at 3:04 AM Andres Freund <andres@anarazel.de> wrote:
> >
> > > One difference I see with the patch is that I think we will end up
> > > sending keepalive for empty prepared transactions even though we don't
> > > skip sending begin/prepare messages for those.
> >
> > With the proposed approach we reliably know whether a callback wrote
> > something, so we can tune the behaviour here fairly easily.
> >
> 
> I would like to clarify a few things about the proposed approach. In
> commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
> ctx->did_write = false;, then call the commit/prepare callback (which
> will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
> update_progress() which will make decisions based on ctx->did_write
> flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
> should know that the transaction has performed some writes before that
> call which is currently working because pgoutput is tracking the same
> via sent_begin_txn.

I don't really see these as being related. What pgoutput does internally to
optimize for some usecases shouldn't matter to the larger infrastructure.


> Is the intention here that we still track whether BEGIN () has been sent via
> pgoutput?

Yes. If somebody later wants to propose tracking this alongside a txn and
passing that to the output plugin callbacks, we can do that. But that's
independent of fixing the broken architecture of the progress infrastructure.

Greetings,

Andres Freund



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Andres Freund
Date:
Hi,

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:
> > > The patch calls update_progress in change_cb_wrapper and other
> > > wrappers which will miss the case of DDLs that generates a lot of data
> > > that is not processed by the plugin. I think for that we either need
> > > to call update_progress from reorderbuffer.c similar to what the patch
> > > has removed or we need some other way to address it. Do you have any
> > > better idea?
> >
> > I don't mind calling something like update_progress() in the specific cases
> > that's needed, but I think those are just the
> >   if (!RelationIsLogicallyLogged(relation))
> >   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
> >
> > To me it makes a lot more sense to call update_progress() for those, rather
> > than generally.
> >
> 
> Won't it be better to call it wherever we don't invoke any wrapper
> function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
> changes, etc.? I was thinking that wherever we don't call the wrapper
> function which means we don't have a chance to invoke
> update_progress(), the timeout can happen if there are a lot of such
> messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

Greetings,

Andres Freund



RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Thur, Feb 14, 2023 at 2:03 AM Andres Freund <andres@anarazel.de> wrote:
> On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:
> > > > The patch calls update_progress in change_cb_wrapper and other
> > > > wrappers which will miss the case of DDLs that generates a lot of data
> > > > that is not processed by the plugin. I think for that we either need
> > > > to call update_progress from reorderbuffer.c similar to what the patch
> > > > has removed or we need some other way to address it. Do you have any
> > > > better idea?
> > >
> > > I don't mind calling something like update_progress() in the specific cases
> > > that's needed, but I think those are just the
> > >   if (!RelationIsLogicallyLogged(relation))
> > >   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
> > >
> > > To me it makes a lot more sense to call update_progress() for those, rather
> > > than generally.
> > >
> >
> > Won't it be better to call it wherever we don't invoke any wrapper
> > function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
> > changes, etc.? I was thinking that wherever we don't call the wrapper
> > function which means we don't have a chance to invoke
> > update_progress(), the timeout can happen if there are a lot of such
> > messages.
> 
> ISTM that the likelihood of causing harm due to increased overhead is higher
> than the gain.

I would like to do something for this thread. So, I am planning to update the
patch as per discussion in the email chain unless someone is already working on
it.

Regards,
Wang wei

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Sun, Feb 19, 2023 at 21:06 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com> wrote:
> On Thur, Feb 14, 2023 at 2:03 AM Andres Freund <andres@anarazel.de> wrote:
> > On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:
> > > > > The patch calls update_progress in change_cb_wrapper and other
> > > > > wrappers which will miss the case of DDLs that generates a lot of data
> > > > > that is not processed by the plugin. I think for that we either need
> > > > > to call update_progress from reorderbuffer.c similar to what the patch
> > > > > has removed or we need some other way to address it. Do you have any
> > > > > better idea?
> > > >
> > > > I don't mind calling something like update_progress() in the specific cases
> > > > that's needed, but I think those are just the
> > > >   if (!RelationIsLogicallyLogged(relation))
> > > >   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
> > > >
> > > > To me it makes a lot more sense to call update_progress() for those, rather
> > > > than generally.
> > > >
> > >
> > > Won't it be better to call it wherever we don't invoke any wrapper
> > > function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
> > > changes, etc.? I was thinking that wherever we don't call the wrapper
> > > function which means we don't have a chance to invoke
> > > update_progress(), the timeout can happen if there are a lot of such
> > > messages.
> >
> > ISTM that the likelihood of causing harm due to increased overhead is higher
> > than the gain.
> 
> I would like to do something for this thread. So, I am planning to update the
> patch as per discussion in the email chain unless someone is already working on
> it.

Thanks to Andres and Amit for the discussion.

Based on the discussion and Andres' WIP(in [1]), I made the following
modifications:
1. Some function renaming stuffs.
2. Added the threshold-related logic in the function
update_progress_and_keepalive.
3. Added the timeout-related processing of temporary data and
unlogged/foreign/system tables in the function ReorderBufferProcessTXN.
4. Improved error messages in the function OutputPluginPrepareWrite.
5. Invoked function update_progress_and_keepalive to fix sync-related problems
caused by filters such as origin in functions DecodeCommit(), DecodePrepare()
and ReorderBufferAbort();
6. Removed the invocation of function update_progress_and_keepalive in the
function begin_prepare_cb_wrapper().
7. Invoked the function update_progress_and_keepalive() in the function
stream_truncate_cb_wrapper(), just like we do in the function
truncate_cb_wrapper().
8. Removed the check of SyncRepRequested() in the syncrep logic in the function
WalSndUpdateProgressAndKeepAlive();
9. Added the check for wal_sender_timeout before using it in functions
WalSndUpdateProgressAndKeepAlive() and WalSndWriteData();

Attach the new patch.

[1] - https://www.postgresql.org/message-id/20230208200235.esfoggsmuvf4pugt%40awork3.anarazel.de

Regards,
Wang wei

Attachment

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Wang,

Thank you for making the patch. IIUC your patch basically can achieve that output plugin
does not have to call UpdateProgress.

I think the basic approach is as follows, is it right?

1. In *_cb_wrapper, set ctx->did_write to false
2. In OutputPluginWrite() set ctx->did_write to true.
   This means that changes are really written, not skipped.
3. At the end of the transaction, call update_progress_and_keepalive().
   Even if we are not at the end, check skipped count and call the function if needed.
   The counter will be reset if ctx->did_write is true or we exceed the threshold.

Followings are my comments. I apologize if I missed some previous discussions.

01. logical.c

```
+static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
+                                                                                 bool finished_xact);
+
+static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
```

"struct" may be not needed.

02. UpdateDecodingProgressAndKeepalive

I think the name should be UpdateDecodingProgressAndSendKeepalive(), keepalive is not verb.
(But it's ok to ignore if you prefer the shorter name)
Same thing can be said for the name of datatype and callback.

03. UpdateDecodingProgressAndKeepalive

```
+       /* set output state */
+       ctx->accept_writes = false;
+       ctx->write_xid = xid;
+       ctx->write_location = lsn;
+       ctx->did_write = false;
```

Do we have to modify accept_writes, write_xid, and write_location here?
These value is not used in WalSndUpdateProgressAndKeepalive().

04. stream_abort_cb_wrapper

```
+       update_progress_and_keepalive(ctx, true)
```

I'm not sure, but is it correct that call update_progress_and_keepalive() with
finished_xact = true? Isn't there a possibility that streamed sub-transaciton is aborted?


05. is_skip_threshold_change

At the end of the transaction, update_progress_and_keepalive() is called directly.
Don't we have to reset change_count here?

06. ReorderBufferAbort

Assuming that the top transaction is aborted. At that time update_progress_and_keepalive()
is called in stream_abort_cb_wrapper(), an then WalSndUpdateProgressAndKeepalive()
is called at the end of ReorderBufferAbort(). Do we have to do in both?
I think stream_abort_cb_wrapper() may be not needed.

07. WalSndUpdateProgress

You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be still strange
because it will be called even if there are no pending writes.

Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
(at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be needed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED




RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Thur, Feb 23, 2023 at 18:41 PM Kuroda, Hayato/�\田 隼人 <kuroda.hayato@fujitsu.com> wrote:
> Dear Wang,
> 
> Thank you for making the patch. IIUC your patch basically can achieve that
> output plugin
> does not have to call UpdateProgress.

Thanks for your review and comments.

> I think the basic approach is as follows, is it right?
> 
> 1. In *_cb_wrapper, set ctx->did_write to false
> 2. In OutputPluginWrite() set ctx->did_write to true.
>    This means that changes are really written, not skipped.
> 3. At the end of the transaction, call update_progress_and_keepalive().
>    Even if we are not at the end, check skipped count and call the function if
> needed.
>    The counter will be reset if ctx->did_write is true or we exceed the threshold.

Yes, you are right.
For the reset of the counter, please also refer to the reply to #05.

> Followings are my comments. I apologize if I missed some previous discussions.
> 
> 01. logical.c
> 
> ```
> +static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
> +                                                                                 bool finished_xact);
> +
> +static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
> ```
> 
> "struct" may be not needed.

Removed.

> 02. UpdateDecodingProgressAndKeepalive
> 
> I think the name should be UpdateDecodingProgressAndSendKeepalive(),
> keepalive is not verb.
> (But it's ok to ignore if you prefer the shorter name)
> Same thing can be said for the name of datatype and callback.

Yes, I prefer the shorter one. Otherwise, I think some names would be longer.

> 03. UpdateDecodingProgressAndKeepalive
> 
> ```
> +       /* set output state */
> +       ctx->accept_writes = false;
> +       ctx->write_xid = xid;
> +       ctx->write_location = lsn;
> +       ctx->did_write = false;
> ```
> 
> Do we have to modify accept_writes, write_xid, and write_location here?
> These value is not used in WalSndUpdateProgressAndKeepalive().

I think it might be better to set these three flags.
Since LogicalOutputPluginWriterUpdateProgressAndKeepalive is an open callback, I
think setting write_xid and write_location is not just for the function
WalSndUpdateProgressAndKeepalive. And I think setting accept_writes could
prevent some wrong usage.

> 04. stream_abort_cb_wrapper
> 
> ```
> +       update_progress_and_keepalive(ctx, true)
> ```
> 
> I'm not sure, but is it correct that call update_progress_and_keepalive() with
> finished_xact = true? Isn't there a possibility that streamed sub-transaciton is
> aborted?

Fixed.

> 05. is_skip_threshold_change
> 
> At the end of the transaction, update_progress_and_keepalive() is called directly.
> Don't we have to reset change_count here?

I think this might complicate the function is_skip_threshold_change, so I didn't
reset the counter in this case.
I think the worst case of not resetting the counter is to delay sending the
keepalive message for the next transaction. But since the threshold we're using
is safe enough, it seems fine to me not to reset the counter in this case.
Added these related comments in the function is_skip_threshold_change.

> 06. ReorderBufferAbort
> 
> Assuming that the top transaction is aborted. At that time
> update_progress_and_keepalive()
> is called in stream_abort_cb_wrapper(), an then
> WalSndUpdateProgressAndKeepalive()
> is called at the end of ReorderBufferAbort(). Do we have to do in both?
> I think stream_abort_cb_wrapper() may be not needed.

Yes, I think we only need one call for this case.
To make the behavior in *_cb_wrapper look consistent, I avoided the second call
for this case in the function ReorderBufferAbort.

> 07. WalSndUpdateProgress
> 
> You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be
> still strange
> because it will be called even if there are no pending writes.
> 
> Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
> (at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be
> needed.

I think after sending the keepalive message (in WalSndKeepaliveIfNecessary), we
need to make sure the pending data is flushed through the loop.

Attach the new patch.

Regards,
Wang wei

Attachment

Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Peter Smith
Date:
Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

======
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;


/* in streaming mode, stream_stop_cb is required */

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}
}


~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
  if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");

It seems a confusing error message. Can it be worded better? Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?

~~~

8. rollback_prepared_cb_wrapper

  /*
  * If the plugin support two-phase commits then rollback prepared callback
  * is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
  */
  if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?

~~~


9. is_skip_threshold_change

The current usage for this function is like:

if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

~

IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say

if (is_change_threshold_exceeded())
do_something();

~~~

10. is_skip_threshold_change

static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
*  changes */

/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}

/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required.  Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}

return false;
}

~

That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:

BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)

SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)

SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)

~~~

11. update_progress_and_keepalive

/*
 * Update progress tracking and send keep alive (if required).
 */
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
  bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}

~

Maybe it's simpler to code this without the return.

e.g.

if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}

(it is just generic suggested code for example -- I made some other
review comments overlapping this)


======
.../replication/logical/reorderbuffer.c

12. ReorderBufferAbort

+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+    xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+

I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?

======
src/backend/replication/walsender.c

~~~

13. WalSndUpdateProgressAndKeepalive

- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
     now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
    wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();

Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.


------
Kind Regards,
Peter Smith.
Fujitsu Australia



RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"Takamichi Osumi (Fujitsu)"
Date:
Hi,


On Monday, February 27, 2023 6:30 PM wangw.fnst@fujitsu.com <wangw.fnst@fujitsu.com> wrote:
> Attach the new patch.
Thanks for sharing v3. Minor review comments and question.


(1) UpdateDecodingProgressAndKeepalive header comment

The comment should be updated to explain maybe why we reset some other flags as discussed in [1] and the functionality
toupdate and keepalive of the function simply. 

(2) OutputPluginPrepareWrite

Probably the changed error string is too wide.

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
        if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+               elog(ERROR, "writes are only accepted in callbacks in the OutputPluginCallbacks structure (except
startup,shutdown, filter_by_origin and filter_prepare callbacks)"); 

I thought you can break the error message into two string lines. Or, you can rephrase it to different expression.

(3) Minor question

The patch introduced the goto statements into the cb_wrapper functions. Is the purpose to call the
update_progress_and_keepaliveafter pop the error stack, even if the corresponding callback is missing ? I thought we
canjust have "else" clause for the check of the existence of callback, but did you choose the current goto style for
readability? 

(4) Name of is_skip_threshold_change

I also feel the name of is_skip_threshold_change can be changed to "exceeded_keepalive_threshold" or something. Other
candidatesare proposed by Peter-san in [2]. 



[1] -
https://www.postgresql.org/message-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9%40OS3PR01MB6275.jpnprd01.prod.outlook.com
[2] - https://www.postgresql.org/message-id/CAHut%2BPt3ZEMo-KTF%3D5KJSU%2BHdWJD19GPGGCKOmBeM47484Ychw%40mail.gmail.com


Best Regards,
    Takamichi Osumi




RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com> wrote:
> Here are some comments for the v2-0001 patch.
> 
> (I haven't looked at the v3 that was posted overnight; maybe some of
> my comments have already been addressed.)

Thanks for your comments.

> ======
> General
> 
> 1. (Info from the commit message)
> Since we can know whether the change is an end of transaction change in the
> common code, we removed the LogicalDecodingContext->end_xact introduced
> in
> commit f95d53e.
> 
> ~
> 
> TBH, it was not clear to me that this change was an improvement. IIUC,
> it removes the "unnecessary" member, but only does that by replacing
> it everywhere with a boolean parameter passed to
> update_progress_and_keepalive(). So the end result seems no less code,
> but it is less readable code now because you need to know what the
> true/false parameter means. I wonder if it would have been better just
> to leave this how it was.

Since I think we can know the meaning of the input based on the parameter name
of the function, I think both approaches are fine. But the approach in the
current patch can reduce a member of the structure, so I think this modification
looks good to me.

> ======
> src/backend/replication/logical/logical.c
> 
> 2. General - blank lines
> 
> There are multiple places in this file where the patch removed some
> statements but left blank lines. The result is 2 blank lines remaining
> instead of one.
> 
> see change_cb_wrapper.
> see truncate_cb_wrapper.
> see stream_start_cb_wrapper.
> see stream_stop_cb_wrapper.
> see stream_change_cb_wrapper.
> 
> e.g.
> 
> BEFORE
> ctx->write_location = last_lsn;
> 
> ctx->end_xact = false;
> 
> /* in streaming mode, stream_stop_cb is required */
> 
> AFTER (now there are 2 blank lines)
> ctx->write_location = last_lsn;
> 
> 
> /* in streaming mode, stream_stop_cb is required */

Removed.

> ~~~
> 
> 3. General - calls to is_skip_threshold_change()
> 
> + if (is_skip_threshold_change(ctx))
> + update_progress_and_keepalive(ctx, false);
> 
> There are multiple calls like this, which are guarding the
> update_progress_and_keepalive() with the is_skip_threshold_change()
> - See truncate_cb_wrapper
> - See message_cb_wrapper
> - See stream_change_cb_wrapper
> - See stream_message_cb_wrapper
> - See stream_truncate_cb_wrapper
> - See UpdateDecodingProgressAndKeepalive
> 
> IIUC, then I was thinking all those conditions maybe can be pushed
> down *into* the wrapper, thereby making every calling code simpler.
> 
> e.g. make the wrapper function code look similar to the current
> UpdateDecodingProgressAndKeepalive:
> 
> BEFORE (update_progress_and_keepalive)
> {
> if (!ctx->update_progress_and_keepalive)
> return;
> 
> ctx->update_progress_and_keepalive(ctx, ctx->write_location,
>    ctx->write_xid, ctx->did_write,
>    finished_xact);
> }
> AFTER
> {
> if (!ctx->update_progress_and_keepalive)
> return;
> 
> if (finished_xact || is_skip_threshold_change(ctx))
> {
> ctx->update_progress_and_keepalive(ctx, ctx->write_location,
>    ctx->write_xid, ctx->did_write,
>    finished_xact);
> }
> }

Since I want to keep the function update_progress_and_keepalive simple, I didn't
change it.

> ~~~
> 
> 4. StartupDecodingContext
> 
> @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
>     XLogReaderRoutine *xl_routine,
>     LogicalOutputPluginWriterPrepareWrite prepare_write,
>     LogicalOutputPluginWriterWrite do_write,
> -   LogicalOutputPluginWriterUpdateProgress update_progress)
> +   LogicalOutputPluginWriterUpdateProgressAndKeepalive
> update_progress_and_keepalive)
> 
> TBH, I find it confusing that the new parameter name
> ('update_progress_and_keepalive') is identical to the static function
> name in the same C source file. It introduces a kind of unnecessary
> shadowing and makes it harder to search/read the code.
> 
> I suggest just calling this param something unique and local to the
> function like 'do_update_keepalive'.
> 
> ~~~
> 5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
>     XLogReaderRoutine *xl_routine,
>     LogicalOutputPluginWriterPrepareWrite prepare_write,
>     LogicalOutputPluginWriterWrite do_write,
> -   LogicalOutputPluginWriterUpdateProgress update_progress)
> +   LogicalOutputPluginWriterUpdateProgressAndKeepalive
> update_progress_and_keepalive)
> 
> (Ditto previous comment #4)
> 
> TBH, I find it confusing that the new parameter name
> ('update_progress_and_keepalive') is identical to the static function
> name in the same C source file. It introduces a kind of unnecessary
> shadowing and makes it harder to search/read the code.
> 
> I suggest just calling this param something unique and local to the
> function like 'do_update_keepalive'.
> ~~~
> 
> 6. CreateDecodingContext
> 
> @@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
>     XLogReaderRoutine *xl_routine,
>     LogicalOutputPluginWriterPrepareWrite prepare_write,
>     LogicalOutputPluginWriterWrite do_write,
> -   LogicalOutputPluginWriterUpdateProgress update_progress)
> +   LogicalOutputPluginWriterUpdateProgressAndKeepalive
> update_progress_and_keepalive)
> 
> (Ditto previous comment #4)
> 
> TBH, I find it confusing that the new parameter name
> ('update_progress_and_keepalive') is identical to the static function
> name in the same C source file. It introduces a kind of unnecessary
> shadowing and makes it harder to search/read the code.
> 
> I suggest just calling this param something unique and local to the
> function like 'do_update_keepalive'.

I'm not sure if 'do_update_keepalive' is accurate. So, to distinguish this
function from the parameter, I renamed the function to
'UpdateProgressAndKeepalive'.

> ~~~
> 
> 7. OutputPluginPrepareWrite
> 
> @@ -662,7 +657,7 @@ void
>  OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
>  {
>   if (!ctx->accept_writes)
> - elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
> + elog(ERROR, "writes are only accepted in callbacks in the
> OutputPluginCallbacks structure (except startup, shutdown,
> filter_by_origin and filter_prepare callbacks)");
> 
> It seems a confusing error message. Can it be worded better?

I tried to improve this message in the new patch. Do you have any suggestions to
improve it?

> Also, I
> noticed this flag is never used except in this one place where it
> throws an error, so would an "Assert" would be more appropriate here?

I'm not sure if we should change errors to assertions here.

> ~~~
> 
> 8. rollback_prepared_cb_wrapper
> 
>   /*
>   * If the plugin support two-phase commits then rollback prepared callback
>   * is mandatory
> + *
> + * FIXME: This should have been caught much earlier.
>   */
>   if (ctx->callbacks.rollback_prepared_cb == NULL)
> ~
> Is this FIXME related to the current patch, or should this be an
> entirely different topic?

I think this FIXME seems to be another topic and I will delete this FIXME later.

> ~~~
> 
> 
> 9. is_skip_threshold_change
> 
> The current usage for this function is like:
> 
> if (is_skip_threshold_change(ctx))
> + update_progress_and_keepalive(ctx, false);
> 
> ~
> 
> IMO a better name for this function might be like
> 'is_change_threshold_exceeded()' (or
> 'is_keepalive_threshold_exceeded()' etc) because seems more readable
> to say
> 
> if (is_change_threshold_exceeded())
> do_something();

Renamed this function to is_keepalive_threshold_exceeded.

> ~~~
> 
> 10. is_skip_threshold_change
> 
> static bool
> is_skip_threshold_change(struct LogicalDecodingContext *ctx)
> {
> static int changes_count = 0; /* used to accumulate the number of
> *  changes */
> 
> /* If the change was published, reset the counter and return false */
> if (ctx->did_write)
> {
> changes_count = 0;
> return false;
> }
> 
> /*
> * It is possible that the data is not sent to downstream for a long time
> * either because the output plugin filtered it or there is a DDL that
> * generates a lot of data that is not processed by the plugin. So, in
> * such cases, the downstream can timeout. To avoid that we try to send a
> * keepalive message if required.  Trying to send a keepalive message
> * after every change has some overhead, but testing showed there is no
> * noticeable overhead if we do it after every ~100 changes.
> */
> #define CHANGES_THRESHOLD 100
> if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
> {
> changes_count = 0;
> return true;
> }
> 
> return false;
> }
> 
> ~
> 
> That 2nd condition checking if (!ctx->did_write && ++changes_count >=
> CHANGES_THRESHOLD) does not seem right. There is no need to check the
> ctx->did_write; it must be false because it was checked earlier in the
> function:
> 
> BEFORE
> if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
> 
> SUGGESTION1
> Assert(!ctx->did_write);
> if (++changes_count >= CHANGES_THRESHOLD)
> 
> SUGGESTION2
> if (++changes_count >= CHANGES_THRESHOLD)

Fixed.
I think the second suggestion looks better to me.

> ~~~
> 
> 11. update_progress_and_keepalive
> 
> /*
>  * Update progress tracking and send keep alive (if required).
>  */
> static void
> update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
>   bool finished_xact)
> {
> if (!ctx->update_progress_and_keepalive)
> return;
> 
> ctx->update_progress_and_keepalive(ctx, ctx->write_location,
>    ctx->write_xid, ctx->did_write,
>    finished_xact);
> }
> 
> ~
> 
> Maybe it's simpler to code this without the return.
> 
> e.g.
> 
> if (ctx->update_progress_and_keepalive)
> {
> ctx->update_progress_and_keepalive(ctx, ctx->write_location,
>    ctx->write_xid, ctx->did_write,
>    finished_xact);
> }
> 
> (it is just generic suggested code for example -- I made some other
> review comments overlapping this)

I think these two approaches are fine. But because I think the approach in the
current patch is consistent with the style of other functions, I didn't change
it.

> ======
> .../replication/logical/reorderbuffer.c
> 
> 12. ReorderBufferAbort
> 
> + UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb-
> >private_data,
> +    xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
> +
> 
> I didn't really recognise how the
> "!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
> 'finished_xact' param. Can this call have an explanatory comment about
> how it works?

It seems confusing to use txn->toplevel_xid to check whether it is top
transaction. Because the comment of txn->toptxn shows the meaning of value, I
updated the patch to use txn->toptxn to check this.

> ======
> src/backend/replication/walsender.c
> ~~~
> 
> 13. WalSndUpdateProgressAndKeepalive
> 
> - if (pending_writes || (!end_xact &&
> + if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
>      now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
>     wal_sender_timeout / 2)))
> - ProcessPendingWrites();
> + WalSndSendPending();
> 
> Is this new function name OK to be WalSndSendPending? From this code,
> we can see it can also be called in other scenarios even when there is
> nothing "pending" at all.

I think this function is used to flush pending data or send keepalive message.
But I'm not sure if we should add keepalive related string to the function
name, which seems to make this function name too long.

Attach the new patch.

Regards,
Wang wei

Attachment

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Tues, Feb 28, 2023 at 11:31 AM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:
> Hi,
> 
> 
> On Monday, February 27, 2023 6:30 PM wangw.fnst@fujitsu.com
> <wangw.fnst@fujitsu.com> wrote:
> > Attach the new patch.
> Thanks for sharing v3. Minor review comments and question.

Thanks for your comments.

> (1) UpdateDecodingProgressAndKeepalive header comment
> 
> The comment should be updated to explain maybe why we reset some other
> flags as discussed in [1] and the functionality to update and keepalive of the
> function simply.

Added the comments atop the function UpdateDecodingProgressAndKeepalive about
when to call this function.

> (2) OutputPluginPrepareWrite
> 
> Probably the changed error string is too wide.
> 
> @@ -662,7 +657,7 @@ void
>  OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
>  {
>         if (!ctx->accept_writes)
> -               elog(ERROR, "writes are only accepted in commit, begin and change
> callbacks");
> +               elog(ERROR, "writes are only accepted in callbacks in the
> OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin and
> filter_prepare callbacks)");
> 
> I thought you can break the error message into two string lines. Or, you can
> rephrase it to different expression.

I tried to improve this message and broke it into two lines in the new patch.

> (3) Minor question
> 
> The patch introduced the goto statements into the cb_wrapper functions. Is the
> purpose to call the update_progress_and_keepalive after pop the error stack,
> even if the corresponding callback is missing ? I thought we can just have "else"
> clause for the check of the existence of callback, but did you choose the current
> goto style for readability ?

I think both styles look fine to me.
I haven't modified this for this version. I'll reconsider if anyone else has
similar thoughts later.

> (4) Name of is_skip_threshold_change
> 
> I also feel the name of is_skip_threshold_change can be changed to
> "exceeded_keepalive_threshold" or something. Other candidates are proposed
> by Peter-san in [2].

Renamed this function to is_keepalive_threshold_exceeded.

Please see the new patch in [1].

[1] -
https://www.postgresql.org/message-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Peter Smith
Date:
On Wed, Mar 1, 2023 at 9:16 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:
>
> On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com> wrote:
> > Here are some comments for the v2-0001 patch.
> >
> > (I haven't looked at the v3 that was posted overnight; maybe some of
> > my comments have already been addressed.)
>
> Thanks for your comments.
>
> > ======
> > General
> >
> > 1. (Info from the commit message)
> > Since we can know whether the change is an end of transaction change in the
> > common code, we removed the LogicalDecodingContext->end_xact introduced
> > in
> > commit f95d53e.
> >
> > ~
> >
> > TBH, it was not clear to me that this change was an improvement. IIUC,
> > it removes the "unnecessary" member, but only does that by replacing
> > it everywhere with a boolean parameter passed to
> > update_progress_and_keepalive(). So the end result seems no less code,
> > but it is less readable code now because you need to know what the
> > true/false parameter means. I wonder if it would have been better just
> > to leave this how it was.
>
> Since I think we can know the meaning of the input based on the parameter name
> of the function, I think both approaches are fine. But the approach in the
> current patch can reduce a member of the structure, so I think this modification
> looks good to me.
>

Hmm, I am not so sure:

- Why is reducing members of LogicalDecodingContext even a goal? I
thought the LogicalDecodingContext is intended to be the one-stop
place to hold *all* things related to the "Context" (including that
member that was deleted).

- How is reducing one member better than introducing one new parameter
in multiple calls?

Anyway, I think this exposes another problem. If you still want the
patch to pass the 'finshed_xact' parameter separately then AFAICT the
first parameter (ctx) now becomes unused/redundant in the
WalSndUpdateProgressAndKeepalive function, so it ought to be removed.

> > ======
> > src/backend/replication/logical/logical.c
> >
> > 3. General - calls to is_skip_threshold_change()
> >
> > + if (is_skip_threshold_change(ctx))
> > + update_progress_and_keepalive(ctx, false);
> >
> > There are multiple calls like this, which are guarding the
> > update_progress_and_keepalive() with the is_skip_threshold_change()
> > - See truncate_cb_wrapper
> > - See message_cb_wrapper
> > - See stream_change_cb_wrapper
> > - See stream_message_cb_wrapper
> > - See stream_truncate_cb_wrapper
> > - See UpdateDecodingProgressAndKeepalive
> >
> > IIUC, then I was thinking all those conditions maybe can be pushed
> > down *into* the wrapper, thereby making every calling code simpler.
> >
> > e.g. make the wrapper function code look similar to the current
> > UpdateDecodingProgressAndKeepalive:
> >
> > BEFORE (update_progress_and_keepalive)
> > {
> > if (!ctx->update_progress_and_keepalive)
> > return;
> >
> > ctx->update_progress_and_keepalive(ctx, ctx->write_location,
> >    ctx->write_xid, ctx->did_write,
> >    finished_xact);
> > }
> > AFTER
> > {
> > if (!ctx->update_progress_and_keepalive)
> > return;
> >
> > if (finished_xact || is_skip_threshold_change(ctx))
> > {
> > ctx->update_progress_and_keepalive(ctx, ctx->write_location,
> >    ctx->write_xid, ctx->did_write,
> >    finished_xact);
> > }
> > }
>
> Since I want to keep the function update_progress_and_keepalive simple, I didn't
> change it.

Hmm, the reason given seems like a false economy to me. You are able
to keep this 1 function simpler only by adding more complexity to the
calls in 6 other places. Let's see if other people have opinions about
this.

~~~

1.
+
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+    bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);

1a.
There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

~

1b.
I did not recognize a reason for the different naming conventions.
Here are two new functions but one is CamelCase and one is snake_case.
What are the rules to decide the naming?

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Andres Freund
Date:
Hi,

On 2023-03-03 11:18:04 +1100, Peter Smith wrote:
> - Why is reducing members of LogicalDecodingContext even a goal? I
> thought the LogicalDecodingContext is intended to be the one-stop
> place to hold *all* things related to the "Context" (including that
> member that was deleted).

There's not really a reason to keep it in LogicalDecodingContext after
this change. It was only needed there because of the broken
architectural model of calling UpdateProgress from within output
plugins. Why set a field in each wrapper that we don't need?

> - How is reducing one member better than introducing one new parameter
> in multiple calls?

Reducing the member isn't important, needing to set it before each
callback however makes sense.

Greetings,

Andres Freund



RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"houzj.fnst@fujitsu.com"
Date:
On Friday, March 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:
> On Wed, Mar 1, 2023 at 9:16 PM wangw.fnst@fujitsu.com
> <wangw.fnst@fujitsu.com> wrote:
> >
> > On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com>
> wrote:
> > > Here are some comments for the v2-0001 patch.
> > >
> > > (I haven't looked at the v3 that was posted overnight; maybe some of
> > > my comments have already been addressed.)
> >
> > Thanks for your comments.
> >
> > > ======
> > > General
> > >
> > > 1. (Info from the commit message)
> > > Since we can know whether the change is an end of transaction change
> > > in the common code, we removed the
> LogicalDecodingContext->end_xact
> > > introduced in commit f95d53e.
> > >
> > > ~
> > >
> > > TBH, it was not clear to me that this change was an improvement.
> > > IIUC, it removes the "unnecessary" member, but only does that by
> > > replacing it everywhere with a boolean parameter passed to
> > > update_progress_and_keepalive(). So the end result seems no less
> > > code, but it is less readable code now because you need to know what
> > > the true/false parameter means. I wonder if it would have been
> > > better just to leave this how it was.
> >
> > Since I think we can know the meaning of the input based on the
> > parameter name of the function, I think both approaches are fine. But
> > the approach in the current patch can reduce a member of the
> > structure, so I think this modification looks good to me.
> >
> 
...
> 
> Anyway, I think this exposes another problem. If you still want the patch to pass
> the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
> now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
> function, so it ought to be removed.
> 

I am not sure about this. The first parameter (ctx) has been introduced since
the Lag tracking feature. I think this is to make it consistent with other
LogicalOutputPluginWriter callbacks. In addition, this is a public callback
function and user can implement their own logic in this callbacks based on
interface, removing this existing parameter doesn't look great to me. Although
this patch also removes the existing skipped_xact, but it's because we decide
to use another parameter did_write which can play a similar role.

Best Regards,
Hou zj

Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Peter Smith
Date:
On Fri, Mar 3, 2023 at 1:27 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Friday, March 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:
...
> > Anyway, I think this exposes another problem. If you still want the patch to pass
> > the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
> > now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
> > function, so it ought to be removed.
> >
>
> I am not sure about this. The first parameter (ctx) has been introduced since
> the Lag tracking feature. I think this is to make it consistent with other
> LogicalOutputPluginWriter callbacks. In addition, this is a public callback
> function and user can implement their own logic in this callbacks based on
> interface, removing this existing parameter doesn't look great to me. Although
> this patch also removes the existing skipped_xact, but it's because we decide
> to use another parameter did_write which can play a similar role.
>

Oh right, that makes sense. Thanks.

Perhaps it just wants some comment to mention that although the
built-in implementation does not use the 'ctx' users might implement
their own logic which does use it.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Fri, Mar 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:
> 

Thanks for your comments.

> 1.
> +
> +static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
> +    bool finished_xact);
> +
> +static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);
> 
> 1a.
> There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

Removed.

> ~
> 
> 1b.
> I did not recognize a reason for the different naming conventions.
> Here are two new functions but one is CamelCase and one is snake_case.
> What are the rules to decide the naming?

I used the snake_case style for the function UpdateProgressAndKeepalive in the
previous version, but it was confusing because it shared the same parameter name
with the functions StartupDecodingContext, CreateInitDecodingContext and
CreateDecodingContext. To avoid this confusion, and since both naming styles
exist in this file, I changed it to CamelCase style.

Attach the new patch.

Regards,
Wang wei

Attachment

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Wang,

Thank you for updating the patch! Followings are my comments.

---
01. missing comments
You might miss the comment from Peter[1]. Or could you pin the related one?

---
02. LogicalDecodingProcessRecord()

Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
decoding function? Assuming that the timeout parameter does not have enough time
period and there are so many sequential operations in the transaction. At that time
there may be a possibility that timeout is occurred while calling ReorderBufferProcessXid()
several times.  It may be a bad example, but I meant to say that we may have to
consider the case that decoding function has not implemented yet.

---
03. stream_*_cb_wrapper

Only stream_*_cb_wrapper have comments "don't call update progress, we didn't really make any", but
there are more functions that does not send updates. Do you have any reasons why only they have?

[1]: https://www.postgresql.org/message-id/CAHut%2BPsksiQHuv4A54R4w79TAvCu__PcuffKYY0V96e2z_sEvA%40mail.gmail.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Tue, Mar 7, 2023 15:55 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:
> Dear Wang,
> 
> Thank you for updating the patch! Followings are my comments.

Thanks for your comments.

> ---
> 01. missing comments
> You might miss the comment from Peter[1]. Or could you pin the related one?

Since I think the functions WalSndPrepareWrite and WalSndWriteData have similar
parameters and the HEAD has no related comments, I'm not sure whether we should
add them in this patch, or in a separate patch to comment atop these callback
functions or where they are called.

> ---
> 02. LogicalDecodingProcessRecord()
> 
> Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
> decoding function? Assuming that the timeout parameter does not have enough
> time
> period and there are so many sequential operations in the transaction. At that
> time
> there may be a possibility that timeout is occurred while calling
> ReorderBufferProcessXid()
> several times.  It may be a bad example, but I meant to say that we may have to
> consider the case that decoding function has not implemented yet.

I think it's ok in this function. If the decoding function has not been
implemented for a record, I think we quickly return to the loop in the function
WalSndLoop, where it will try to send the keepalive message.

BTW, in the previous discussion [1], we decided to ignore some paths, because
the gain from modifying them may not be so great.

> ---
> 03. stream_*_cb_wrapper
> 
> Only stream_*_cb_wrapper have comments "don't call update progress, we
> didn't really make any", but
> there are more functions that does not send updates. Do you have any reasons
> why only they have?

Added this comment to more functions.
I think the following six functions don't call the function
UpdateProgressAndKeepalive in v5 patch:
- begin_cb_wrapper
- begin_prepare_cb_wrapper
- startup_cb_wrapper
- shutdown_cb_wrapper
- filter_prepare_cb_wrapper
- filter_by_origin_cb_wrapper

I think the comment you mentioned means that no new progress needs to be updated
in this *_cb_wrapper. Also, I think we don't need to update the progress at the
beginning of a transaction, just like in HEAD. So, I added the same comment only
in the 4 functions below:
- startup_cb_wrapper
- shutdown_cb_wrapper
- filter_prepare_cb_wrapper
- filter_by_origin_cb_wrapper

Attach the new patch.

[1] - https://www.postgresql.org/message-id/20230213180302.u5sqosteflr3zkiz%40awork3.anarazel.de

Regards,
Wang wei

Attachment

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Wang,

Thank you for updating the patch! I have briefly tested your patch
and it worked well in following case.

* WalSndUpdateProgressAndKeepalive is called when many inserts have come
  but the publisher does not publish the insertion. PSA the script for this.
* WalSndUpdateProgressAndKeepalive is called when the commit record is not
  related with the specified database
* WalSndUpdateProgressAndKeepalive is called when many inserts for unlogged
  tables are done.

> > ---
> > 01. missing comments
> > You might miss the comment from Peter[1]. Or could you pin the related one?
> 
> Since I think the functions WalSndPrepareWrite and WalSndWriteData have
> similar
> parameters and the HEAD has no related comments, I'm not sure whether we
> should
> add them in this patch, or in a separate patch to comment atop these callback
> functions or where they are called.

Make sense, OK.

> > ---
> > 02. LogicalDecodingProcessRecord()
> >
> > Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
> > decoding function? Assuming that the timeout parameter does not have enough
> > time
> > period and there are so many sequential operations in the transaction. At that
> > time
> > there may be a possibility that timeout is occurred while calling
> > ReorderBufferProcessXid()
> > several times.  It may be a bad example, but I meant to say that we may have to
> > consider the case that decoding function has not implemented yet.
> 
> I think it's ok in this function. If the decoding function has not been
> implemented for a record, I think we quickly return to the loop in the function
> WalSndLoop, where it will try to send the keepalive message.

I confirmed that and yes, we will go back to WalSndLoop().

> BTW, in the previous discussion [1], we decided to ignore some paths, because
> the gain from modifying them may not be so great.

I missed the discussion, thanks. Based on that codes seems right.

Followings are my comments.

---
```
+/*
+ * Update progress tracking and send keep alive (if required).
+ */
+static void
+UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
```

Can we add atop the UpdateProgressAndKeepalive()? Currently the developers who
create output plugins must call OutputPluginUpdateProgress(), but from now the
function is not only renamed but does not have nessesary to call from plugin
(of cource we do not restrict to call it). I think it must be clarified for them.

---
ReorderBufferUpdateProgressTxnCB must be removed from typedefs.list.

---
Do we have to write a document for the breakage somewhere? I think we do not have
to add appendix-obsolete-* file because we did not have any links for that, but
we can add a warning in "Functions for Producing Output" subsection if needed.  

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


Attachment

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"Takamichi Osumi (Fujitsu)"
Date:
Hi,


On Wednesday, March 8, 2023 11:54 AM From: wangw.fnst@fujitsu.com <wangw.fnst@fujitsu.com> wrote:
> Attach the new patch.
Thanks for sharing v6 ! Few minor comments for the same.

(1) commit message

The old function name 'is_skip_threshold_change' is referred currently. We need to update it to
'is_keepalive_threshold_exceeded'I think.
 

(2) OutputPluginPrepareWrite

@@ -662,7 +656,8 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
        if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+               elog(ERROR, "writes are only accepted in output plugin callbacks, "
+                        "except startup, shutdown, filter_by_origin, and filter_prepare.");

We can remove the period at the end of error string.

(3) is_keepalive_threshold_exceeded's comments

+/*
+ * Helper function to check whether a large number of changes have been skipped
+ * continuously.
+ */
+static bool
+is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)

I suggest to update the comment slightly something like below.
From:
...whether a large number of changes have been skipped continuously
To:
...whether a large number of changes have been skipped without being sent to the output plugin continuously

(4) term for 'keepalive'

+/*
+ * Update progress tracking and send keep alive (if required).
+ */

The 'keep alive' might be better to be replaced with 'keepalive', which looks commonest in other source codes. In the
currentpatch, there are 3 different ways to express it (the other one is 'keep-alive') and it would be better to unify
theterm, at least within the same patch ?
 


Best Regards,
    Takamichi Osumi


Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Peter Smith
Date:
Here are some review comments for v6-0001

======
General.

1.
There are lots of new comments saying:
/* don't call update progress, we didn't really make any */

but is the wording "call update progress" meaningful?

Should that be written something more like:
/* No progress has been made so there is no need to call
UpdateProgressAndKeepalive. */

======

2. rollback_prepared_cb_wrapper

  /*
  * If the plugin support two-phase commits then rollback prepared callback
  * is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
  */
  if (ctx->callbacks.rollback_prepared_cb == NULL)
  ereport(ERROR,

~

Why is this seemingly unrelated FIXME still in the patch? I thought it
was posted a while ago (See [1] comment #8) that this would be
deleted.

~~~

4.

@@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,

  /* Pop the error context stack */
  error_context_stack = errcallback.previous;
+
+ UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
 }

~

Are the double parentheses necessary?

~~~

5. UpdateProgressAndKeepalive

I had previously suggested (See [2] comment #3) that the code might be
simplified if the "is_keepalive_threshold_exceeded(ctx)" check was
pushed down into this function, but it seems like nobody else gave any
opinion for/against that idea yet... so the question still stands.

======
src/backend/replication/walsender.c

6. WalSndUpdateProgressAndKeepalive

Since the 'ctx' is unused here, it might be nicer to annotate that to
make it clear it is deliberate and suppress any possible warnings
about unused params.

e.g. something like:

WalSndUpdateProgressAndKeepalive(
pg_attribute_unused() LogicalDecodingContext *ctx,
XLogRecPtr lsn,
TransactionId xid,
bool did_write,
bool finished_xact)

------
[1]
https://www.postgresql.org/message-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9%40OS3PR01MB6275.jpnprd01.prod.outlook.com
[2] https://www.postgresql.org/message-id/CAHut%2BPt3ZEMo-KTF%3D5KJSU%2BHdWJD19GPGGCKOmBeM47484Ychw%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia.



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Amit Kapila
Date:
On Wed, Mar 8, 2023 at 8:24 AM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:
>
> Attach the new patch.
>

I think this combines multiple improvements in one patch. We can
consider all of them together or maybe it would be better to split
some of those. Do we think it makes sense to split some of the
improvements? I could think of below:

1. Remove SyncRepRequested() check from WalSndUpdateProgress().
2. Add check of wal_sender_timeout > 0 in WalSndUpdateProgress() and
any other similar place.
3. Change the name of ProcessPendingWrites() to WalSndSendPending().
4. Change WalSndUpdateProgress() to WalSndUpdateProgressAndKeepalive().
5. The remaining patch.

Now, for (1), we can consider backpatching but I am not sure if it is
worth it because in the worst case, we will miss sending a keepalive.
For (4), it is not clear to me that we have a complete agreement on
the new name. Andres, do you have an opinion on the new name used in
the patch?

If we agree that we don't need to backpatch for (1) and the new name
for (4) is reasonable then we can commit 1-4 as one patch and then
look at the remaining patch.

Thoughts?

--
With Regards,
Amit Kapila.



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Amit Kapila
Date:
On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> 2. rollback_prepared_cb_wrapper
>
>   /*
>   * If the plugin support two-phase commits then rollback prepared callback
>   * is mandatory
> + *
> + * FIXME: This should have been caught much earlier.
>   */
>   if (ctx->callbacks.rollback_prepared_cb == NULL)
>   ereport(ERROR,
>
> ~
>
> Why is this seemingly unrelated FIXME still in the patch?
>

After reading this Fixme comment and the error message ("logical
replication at prepare time requires a %s callback
rollback_prepared_cb"), I think we can move this and a similar check
in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
function. This is because there is no use of letting prepare pass when
we can't do a rollback or commit prepared. What do you think?

>
> 4.
>
> @@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
>
>   /* Pop the error context stack */
>   error_context_stack = errcallback.previous;
> +
> + UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
>  }
>
> ~
>
> Are the double parentheses necessary?
>

Personally, I find this style easier to follow.

--
With Regards,
Amit Kapila.



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Peter Smith
Date:
On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > 2. rollback_prepared_cb_wrapper
> >
> >   /*
> >   * If the plugin support two-phase commits then rollback prepared callback
> >   * is mandatory
> > + *
> > + * FIXME: This should have been caught much earlier.
> >   */
> >   if (ctx->callbacks.rollback_prepared_cb == NULL)
> >   ereport(ERROR,
> >
> > ~
> >
> > Why is this seemingly unrelated FIXME still in the patch?
> >
>
> After reading this Fixme comment and the error message ("logical
> replication at prepare time requires a %s callback
> rollback_prepared_cb"), I think we can move this and a similar check
> in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
> function. This is because there is no use of letting prepare pass when
> we can't do a rollback or commit prepared. What do you think?
>

My first impression was it sounds like a good idea to catch the
missing callbacks early as you said.

But if you decide to check for missing commit/rollback callbacks early
in prepare_cb_wrapper(), then won't you also want to have equivalent
checking done earlier for stream_prepare_cb_wrapper()?

And then it quickly becomes a slippery slope to question many other things:
- Why allow startup_cb if shutdown_cb is missing?
- Why allow change_cb if commit_cb or rollback_cb is missing?
- Why allow filter_prepare_cb if prepare_cb is missing?
- etc.

~

So I am wondering if the HEAD code lazy-check of the callback only at
the point where it is needed was actually a deliberate design choice
just to be simpler - e.g. we don't need to be so concerned about any
other callback dependencies.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: Rework LogicalOutputPluginWriterUpdateProgress

From
Amit Kapila
Date:
On Fri, Mar 10, 2023 at 11:17 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com> wrote:
> > >
> > > 2. rollback_prepared_cb_wrapper
> > >
> > >   /*
> > >   * If the plugin support two-phase commits then rollback prepared callback
> > >   * is mandatory
> > > + *
> > > + * FIXME: This should have been caught much earlier.
> > >   */
> > >   if (ctx->callbacks.rollback_prepared_cb == NULL)
> > >   ereport(ERROR,
> > >
> > > ~
> > >
> > > Why is this seemingly unrelated FIXME still in the patch?
> > >
> >
> > After reading this Fixme comment and the error message ("logical
> > replication at prepare time requires a %s callback
> > rollback_prepared_cb"), I think we can move this and a similar check
> > in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
> > function. This is because there is no use of letting prepare pass when
> > we can't do a rollback or commit prepared. What do you think?
> >
>
> My first impression was it sounds like a good idea to catch the
> missing callbacks early as you said.
>
> But if you decide to check for missing commit/rollback callbacks early
> in prepare_cb_wrapper(), then won't you also want to have equivalent
> checking done earlier for stream_prepare_cb_wrapper()?
>

Yeah, probably or we can leave the lazy checking as it is. In the
ideal case, we could check for the presence of all the callbacks in
StartupDecodingContext() but we delay it to find the missing methods
later. One possibility is that we check for any missing method in
StartupDecodingContext() if any one of prepare/streaming calls are
present but not sure if that is any better than the current
arrangement.

> And then it quickly becomes a slippery slope to question many other things:
> - Why allow startup_cb if shutdown_cb is missing?
>

I am not sure if there is a hard dependency between these two but
their callers do check for Null before invoking those.

> - Why allow change_cb if commit_cb or rollback_cb is missing?

We have a check for change_cb and commit_cb in LoadOutputPlugin. Do we
have rollback_cb() defined at all?

> - Why allow filter_prepare_cb if prepare_cb is missing?
>

I am not so sure about this but If prepare gets filtered, we don't
need to invoke prepare_cb.

> - etc.
>
> ~
>
> So I am wondering if the HEAD code lazy-check of the callback only at
> the point where it is needed was actually a deliberate design choice
> just to be simpler - e.g. we don't need to be so concerned about any
> other callback dependencies.
>

Yeah, changing that probably needs some more thought. I have mentioned
one of the possibilities above.

--
With Regards,
Amit Kapila.



RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Mon, Mar 10, 2023 11:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> On Wed, Mar 8, 2023 at 8:24 AM wangw.fnst@fujitsu.com
> <wangw.fnst@fujitsu.com> wrote:
> >
> > Attach the new patch.
> >
> 
> I think this combines multiple improvements in one patch. We can
> consider all of them together or maybe it would be better to split
> some of those. Do we think it makes sense to split some of the
> improvements? I could think of below:
> 
> 1. Remove SyncRepRequested() check from WalSndUpdateProgress().
> 2. Add check of wal_sender_timeout > 0 in WalSndUpdateProgress() and
> any other similar place.
> 3. Change the name of ProcessPendingWrites() to WalSndSendPending().
> 4. Change WalSndUpdateProgress() to WalSndUpdateProgressAndKeepalive().
> 5. The remaining patch.

I think it would help to review different improvements separately, so I split
the patch as suggested.

Also addressed the comments by Kuroda-san, Osumi-san and Peter.
Attach the new patch set.

Regards,
Wang wei

Attachment

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Wed, Mar 8, 2023 19:06 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:
> Dear Wang,

Thanks for your testing and comments.

> ---
> ```
> +/*
> + * Update progress tracking and send keep alive (if required).
> + */
> +static void
> +UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
> ```
> 
> Can we add atop the UpdateProgressAndKeepalive()? Currently the developers
> who
> create output plugins must call OutputPluginUpdateProgress(), but from now the
> function is not only renamed but does not have nessesary to call from plugin
> (of cource we do not restrict to call it). I think it must be clarified for them.

Make sense.
Added some comments atop this function.

> ---
> ReorderBufferUpdateProgressTxnCB must be removed from typedefs.list.

Removed.

> ---
> Do we have to write a document for the breakage somewhere? I think we do not
> have
> to add appendix-obsolete-* file because we did not have any links for that, but
> we can add a warning in "Functions for Producing Output" subsection if needed.

Since we've moved the feature (update progress and send keepalive) from the
output plugin into the infrastructure, the output plugin is no longer
responsible for maintaining this feature anymore. Also, I think output plugin
developers only need to remove the call to the old function
OutputPluginUpdateProgress if they get compile errors related to this
modification. So, it seems to me that we don't need to add relevant
modifications in pg-doc.

Regards,
Wang wei

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Wed, Mar 8, 2023 23:55 PM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:
> Hi,
> 
> 
> On Wednesday, March 8, 2023 11:54 AM From: wangw.fnst@fujitsu.com
> <wangw.fnst@fujitsu.com> wrote:
> > Attach the new patch.
> Thanks for sharing v6 ! Few minor comments for the same.

Thanks for your comments.

> (1) commit message
> 
> The old function name 'is_skip_threshold_change' is referred currently. We need
> to update it to 'is_keepalive_threshold_exceeded' I think.

Fixed.

> (2) OutputPluginPrepareWrite
> 
> @@ -662,7 +656,8 @@ void
>  OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
>  {
>         if (!ctx->accept_writes)
> -               elog(ERROR, "writes are only accepted in commit, begin and change
> callbacks");
> +               elog(ERROR, "writes are only accepted in output plugin callbacks, "
> +                        "except startup, shutdown, filter_by_origin, and filter_prepare.");
> 
> We can remove the period at the end of error string.

Removed.

> (3) is_keepalive_threshold_exceeded's comments
> 
> +/*
> + * Helper function to check whether a large number of changes have been
> skipped
> + * continuously.
> + */
> +static bool
> +is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)
> 
> I suggest to update the comment slightly something like below.
> From:
> ...whether a large number of changes have been skipped continuously
> To:
> ...whether a large number of changes have been skipped without being sent to
> the output plugin continuously

Make sense.
Also, I slightly corrected the original function comment with a grammar check
tool. So, the modified comment looks like this:
```
Helper function to check for continuous skipping of many changes without sending
them to the output plugin.
```

> (4) term for 'keepalive'
> 
> +/*
> + * Update progress tracking and send keep alive (if required).
> + */
> 
> The 'keep alive' might be better to be replaced with 'keepalive', which looks
> commonest in other source codes. In the current patch, there are 3 different
> ways to express it (the other one is 'keep-alive') and it would be better to unify
> the term, at least within the same patch ?

Yes, agree.
Unified the comment you mentioned here ('keep alive') and the comment in the
commit message ('keep-alive') as 'keepalive'.

Regards,
Wang wei

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Thur, Mar 9, 2023 13:26 PM Peter Smith <smithpb2250@gmail.com> wrote:
> Here are some review comments for v6-0001

Thanks for your comments.

> ======
> General.
> 
> 1.
> There are lots of new comments saying:
> /* don't call update progress, we didn't really make any */
> 
> but is the wording "call update progress" meaningful?
> 
> Should that be written something more like:
> /* No progress has been made so there is no need to call
> UpdateProgressAndKeepalive. */

Changed.
Shortened your suggested comment using a grammar tool. So, the modified comment
looks like this:
```
No progress has been made, so don't call UpdateProgressAndKeepalive
```

> ~~~
> 
> 4.
> 
> @@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> 
>   /* Pop the error context stack */
>   error_context_stack = errcallback.previous;
> +
> + UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
>  }
> 
> ~
> 
> Are the double parentheses necessary?

I think the code looks clearer this way.

> ======
> src/backend/replication/walsender.c
> 
> 6. WalSndUpdateProgressAndKeepalive
> 
> Since the 'ctx' is unused here, it might be nicer to annotate that to
> make it clear it is deliberate and suppress any possible warnings
> about unused params.
> 
> e.g. something like:
> 
> WalSndUpdateProgressAndKeepalive(
> pg_attribute_unused() LogicalDecodingContext *ctx,
> XLogRecPtr lsn,
> TransactionId xid,
> bool did_write,
> bool finished_xact)

Because many functions don't use this approach, I’m not sure what the rules are
for using it in PG. And I think that we should discuss this on a separate thread
to check which similar functions need this kind of modification in PG source
code.

Regards,
Wang wei

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Mon, Mar 10, 2023 14:35 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> On Fri, Mar 10, 2023 at 11:17 AM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > > On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com>
> wrote:
> > > >
> > > > 2. rollback_prepared_cb_wrapper
> > > >
> > > >   /*
> > > >   * If the plugin support two-phase commits then rollback prepared callback
> > > >   * is mandatory
> > > > + *
> > > > + * FIXME: This should have been caught much earlier.
> > > >   */
> > > >   if (ctx->callbacks.rollback_prepared_cb == NULL)
> > > >   ereport(ERROR,
> > > >
> > > > ~
> > > >
> > > > Why is this seemingly unrelated FIXME still in the patch?
> > > >
> > >
> > > After reading this Fixme comment and the error message ("logical
> > > replication at prepare time requires a %s callback
> > > rollback_prepared_cb"), I think we can move this and a similar check
> > > in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
> > > function. This is because there is no use of letting prepare pass when
> > > we can't do a rollback or commit prepared. What do you think?
> > >
> >
> > My first impression was it sounds like a good idea to catch the
> > missing callbacks early as you said.
> >
> > But if you decide to check for missing commit/rollback callbacks early
> > in prepare_cb_wrapper(), then won't you also want to have equivalent
> > checking done earlier for stream_prepare_cb_wrapper()?
> >
> 
> Yeah, probably or we can leave the lazy checking as it is. In the
> ideal case, we could check for the presence of all the callbacks in
> StartupDecodingContext() but we delay it to find the missing methods
> later. One possibility is that we check for any missing method in
> StartupDecodingContext() if any one of prepare/streaming calls are
> present but not sure if that is any better than the current
> arrangement.
> 
> > And then it quickly becomes a slippery slope to question many other things:
> > - Why allow startup_cb if shutdown_cb is missing?
> >
> 
> I am not sure if there is a hard dependency between these two but
> their callers do check for Null before invoking those.
> 
> > - Why allow change_cb if commit_cb or rollback_cb is missing?
> 
> We have a check for change_cb and commit_cb in LoadOutputPlugin. Do we
> have rollback_cb() defined at all?
> 
> > - Why allow filter_prepare_cb if prepare_cb is missing?
> >
> 
> I am not so sure about this but If prepare gets filtered, we don't
> need to invoke prepare_cb.
> 
> > - etc.
> >
> > ~
> >
> > So I am wondering if the HEAD code lazy-check of the callback only at
> > the point where it is needed was actually a deliberate design choice
> > just to be simpler - e.g. we don't need to be so concerned about any
> > other callback dependencies.
> >
> 
> Yeah, changing that probably needs some more thought. I have mentioned
> one of the possibilities above.

I think this approach looks fine to me. So, I wrote a separate patch (0006) for
discussing and reviewing this approach.

Regards,
Wang wei

RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"Takamichi Osumi (Fujitsu)"
Date:
Hi,


On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com> wrote:
> Attach the new patch set.
Thanks for updating the patch ! One review comment on v7-0005.

stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of threshold check and
UpdateProgressAndKeepaliveunlike other write wrapper functions like below. But, both of them write some data to the
outputplugin, set the flag of did_write and thus it updates the subscriber's last_recv_timestamp used for timeout check
inLogicalRepApplyLoop. So, it looks adding the pair to both functions can be more accurate, in order to reset the
counterin changes_count on the publisher ?
 

@@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,

        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /* No progress has been made, so don't call UpdateProgressAndKeepalive */
 }


Best Regards,
    Takamichi Osumi


RE: Rework LogicalOutputPluginWriterUpdateProgress

From
"wangw.fnst@fujitsu.com"
Date:
On Fri, Mar 10, 2023 20:17 PM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:
> Hi,
> 
> 
> On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com>
> wrote:
> > Attach the new patch set.
> Thanks for updating the patch ! One review comment on v7-0005.

Thanks for your comment.

> stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of
> threshold check and UpdateProgressAndKeepalive unlike other write wrapper
> functions like below. But, both of them write some data to the output plugin, set
> the flag of did_write and thus it updates the subscriber's last_recv_timestamp
> used for timeout check in LogicalRepApplyLoop. So, it looks adding the pair to
> both functions can be more accurate, in order to reset the counter in
> changes_count on the publisher ?
> 
> @@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> 
>         /* Pop the error context stack */
>         error_context_stack = errcallback.previous;
> +
> +       /* No progress has been made, so don't call UpdateProgressAndKeepalive */
>  }

Since I think stream_start/stop_cp are different from change_cb, they don't
represent records in wal, so I think the LSNs corresponding to these two
messages are the LSNs of other records. So, we don't call the function
UpdateProgressAndKeepalive here. Also, for the reasons described in [1].#05, I
didn't reset the counter here.

[1] -
https://www.postgresql.org/message-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

Re: Rework LogicalOutputPluginWriterUpdateProgress

From
vignesh C
Date:
On Mon, 13 Mar 2023 at 08:17, wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:
>
> On Fri, Mar 10, 2023 20:17 PM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:
> > Hi,
> >
> >
> > On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com>
> > wrote:
> > > Attach the new patch set.
> > Thanks for updating the patch ! One review comment on v7-0005.
>
> Thanks for your comment.
>
> > stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of
> > threshold check and UpdateProgressAndKeepalive unlike other write wrapper
> > functions like below. But, both of them write some data to the output plugin, set
> > the flag of did_write and thus it updates the subscriber's last_recv_timestamp
> > used for timeout check in LogicalRepApplyLoop. So, it looks adding the pair to
> > both functions can be more accurate, in order to reset the counter in
> > changes_count on the publisher ?
> >
> > @@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache,
> > ReorderBufferTXN *txn,
> >
> >         /* Pop the error context stack */
> >         error_context_stack = errcallback.previous;
> > +
> > +       /* No progress has been made, so don't call UpdateProgressAndKeepalive */
> >  }
>
> Since I think stream_start/stop_cp are different from change_cb, they don't
> represent records in wal, so I think the LSNs corresponding to these two
> messages are the LSNs of other records. So, we don't call the function
> UpdateProgressAndKeepalive here. Also, for the reasons described in [1].#05, I
> didn't reset the counter here.

As there has been no activity in this thread and it seems there is not
much interest on this from the last 9 months, I have changed the
status of the patch to "Returned with Feedback".

Regards,
Vignesh