Re: Rework LogicalOutputPluginWriterUpdateProgress - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Rework LogicalOutputPluginWriterUpdateProgress
Date
Msg-id CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com
Whole thread Raw
In response to RE: Rework LogicalOutputPluginWriterUpdateProgress  ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>)
Responses RE: Rework LogicalOutputPluginWriterUpdateProgress  ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>)
List pgsql-hackers
Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

======
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;


/* in streaming mode, stream_stop_cb is required */

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}
}


~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
  if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");

It seems a confusing error message. Can it be worded better? Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?

~~~

8. rollback_prepared_cb_wrapper

  /*
  * If the plugin support two-phase commits then rollback prepared callback
  * is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
  */
  if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?

~~~


9. is_skip_threshold_change

The current usage for this function is like:

if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

~

IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say

if (is_change_threshold_exceeded())
do_something();

~~~

10. is_skip_threshold_change

static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
*  changes */

/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}

/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required.  Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}

return false;
}

~

That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:

BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)

SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)

SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)

~~~

11. update_progress_and_keepalive

/*
 * Update progress tracking and send keep alive (if required).
 */
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
  bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}

~

Maybe it's simpler to code this without the return.

e.g.

if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}

(it is just generic suggested code for example -- I made some other
review comments overlapping this)


======
.../replication/logical/reorderbuffer.c

12. ReorderBufferAbort

+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+    xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+

I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?

======
src/backend/replication/walsender.c

~~~

13. WalSndUpdateProgressAndKeepalive

- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
     now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
    wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();

Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.


------
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Nathan Bossart
Date:
Subject: Re: Improve WALRead() to suck data directly from WAL buffers when possible
Next
From: Andres Freund
Date:
Subject: Re: Should vacuum process config file reload more often