RE: Rework LogicalOutputPluginWriterUpdateProgress - Mailing list pgsql-hackers
From | wangw.fnst@fujitsu.com |
---|---|
Subject | RE: Rework LogicalOutputPluginWriterUpdateProgress |
Date | |
Msg-id | OS3PR01MB6275C6CA72222C0C23730A319EAD9@OS3PR01MB6275.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Rework LogicalOutputPluginWriterUpdateProgress (Peter Smith <smithpb2250@gmail.com>) |
Responses |
Re: Rework LogicalOutputPluginWriterUpdateProgress
|
List | pgsql-hackers |
On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com> wrote: > Here are some comments for the v2-0001 patch. > > (I haven't looked at the v3 that was posted overnight; maybe some of > my comments have already been addressed.) Thanks for your comments. > ====== > General > > 1. (Info from the commit message) > Since we can know whether the change is an end of transaction change in the > common code, we removed the LogicalDecodingContext->end_xact introduced > in > commit f95d53e. > > ~ > > TBH, it was not clear to me that this change was an improvement. IIUC, > it removes the "unnecessary" member, but only does that by replacing > it everywhere with a boolean parameter passed to > update_progress_and_keepalive(). So the end result seems no less code, > but it is less readable code now because you need to know what the > true/false parameter means. I wonder if it would have been better just > to leave this how it was. Since I think we can know the meaning of the input based on the parameter name of the function, I think both approaches are fine. But the approach in the current patch can reduce a member of the structure, so I think this modification looks good to me. > ====== > src/backend/replication/logical/logical.c > > 2. General - blank lines > > There are multiple places in this file where the patch removed some > statements but left blank lines. The result is 2 blank lines remaining > instead of one. > > see change_cb_wrapper. > see truncate_cb_wrapper. > see stream_start_cb_wrapper. > see stream_stop_cb_wrapper. > see stream_change_cb_wrapper. > > e.g. > > BEFORE > ctx->write_location = last_lsn; > > ctx->end_xact = false; > > /* in streaming mode, stream_stop_cb is required */ > > AFTER (now there are 2 blank lines) > ctx->write_location = last_lsn; > > > /* in streaming mode, stream_stop_cb is required */ Removed. > ~~~ > > 3. General - calls to is_skip_threshold_change() > > + if (is_skip_threshold_change(ctx)) > + update_progress_and_keepalive(ctx, false); > > There are multiple calls like this, which are guarding the > update_progress_and_keepalive() with the is_skip_threshold_change() > - See truncate_cb_wrapper > - See message_cb_wrapper > - See stream_change_cb_wrapper > - See stream_message_cb_wrapper > - See stream_truncate_cb_wrapper > - See UpdateDecodingProgressAndKeepalive > > IIUC, then I was thinking all those conditions maybe can be pushed > down *into* the wrapper, thereby making every calling code simpler. > > e.g. make the wrapper function code look similar to the current > UpdateDecodingProgressAndKeepalive: > > BEFORE (update_progress_and_keepalive) > { > if (!ctx->update_progress_and_keepalive) > return; > > ctx->update_progress_and_keepalive(ctx, ctx->write_location, > ctx->write_xid, ctx->did_write, > finished_xact); > } > AFTER > { > if (!ctx->update_progress_and_keepalive) > return; > > if (finished_xact || is_skip_threshold_change(ctx)) > { > ctx->update_progress_and_keepalive(ctx, ctx->write_location, > ctx->write_xid, ctx->did_write, > finished_xact); > } > } Since I want to keep the function update_progress_and_keepalive simple, I didn't change it. > ~~~ > > 4. StartupDecodingContext > > @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin, > XLogReaderRoutine *xl_routine, > LogicalOutputPluginWriterPrepareWrite prepare_write, > LogicalOutputPluginWriterWrite do_write, > - LogicalOutputPluginWriterUpdateProgress update_progress) > + LogicalOutputPluginWriterUpdateProgressAndKeepalive > update_progress_and_keepalive) > > TBH, I find it confusing that the new parameter name > ('update_progress_and_keepalive') is identical to the static function > name in the same C source file. It introduces a kind of unnecessary > shadowing and makes it harder to search/read the code. > > I suggest just calling this param something unique and local to the > function like 'do_update_keepalive'. > > ~~~ > 5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin, > XLogReaderRoutine *xl_routine, > LogicalOutputPluginWriterPrepareWrite prepare_write, > LogicalOutputPluginWriterWrite do_write, > - LogicalOutputPluginWriterUpdateProgress update_progress) > + LogicalOutputPluginWriterUpdateProgressAndKeepalive > update_progress_and_keepalive) > > (Ditto previous comment #4) > > TBH, I find it confusing that the new parameter name > ('update_progress_and_keepalive') is identical to the static function > name in the same C source file. It introduces a kind of unnecessary > shadowing and makes it harder to search/read the code. > > I suggest just calling this param something unique and local to the > function like 'do_update_keepalive'. > ~~~ > > 6. CreateDecodingContext > > @@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, > XLogReaderRoutine *xl_routine, > LogicalOutputPluginWriterPrepareWrite prepare_write, > LogicalOutputPluginWriterWrite do_write, > - LogicalOutputPluginWriterUpdateProgress update_progress) > + LogicalOutputPluginWriterUpdateProgressAndKeepalive > update_progress_and_keepalive) > > (Ditto previous comment #4) > > TBH, I find it confusing that the new parameter name > ('update_progress_and_keepalive') is identical to the static function > name in the same C source file. It introduces a kind of unnecessary > shadowing and makes it harder to search/read the code. > > I suggest just calling this param something unique and local to the > function like 'do_update_keepalive'. I'm not sure if 'do_update_keepalive' is accurate. So, to distinguish this function from the parameter, I renamed the function to 'UpdateProgressAndKeepalive'. > ~~~ > > 7. OutputPluginPrepareWrite > > @@ -662,7 +657,7 @@ void > OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write) > { > if (!ctx->accept_writes) > - elog(ERROR, "writes are only accepted in commit, begin and change callbacks"); > + elog(ERROR, "writes are only accepted in callbacks in the > OutputPluginCallbacks structure (except startup, shutdown, > filter_by_origin and filter_prepare callbacks)"); > > It seems a confusing error message. Can it be worded better? I tried to improve this message in the new patch. Do you have any suggestions to improve it? > Also, I > noticed this flag is never used except in this one place where it > throws an error, so would an "Assert" would be more appropriate here? I'm not sure if we should change errors to assertions here. > ~~~ > > 8. rollback_prepared_cb_wrapper > > /* > * If the plugin support two-phase commits then rollback prepared callback > * is mandatory > + * > + * FIXME: This should have been caught much earlier. > */ > if (ctx->callbacks.rollback_prepared_cb == NULL) > ~ > Is this FIXME related to the current patch, or should this be an > entirely different topic? I think this FIXME seems to be another topic and I will delete this FIXME later. > ~~~ > > > 9. is_skip_threshold_change > > The current usage for this function is like: > > if (is_skip_threshold_change(ctx)) > + update_progress_and_keepalive(ctx, false); > > ~ > > IMO a better name for this function might be like > 'is_change_threshold_exceeded()' (or > 'is_keepalive_threshold_exceeded()' etc) because seems more readable > to say > > if (is_change_threshold_exceeded()) > do_something(); Renamed this function to is_keepalive_threshold_exceeded. > ~~~ > > 10. is_skip_threshold_change > > static bool > is_skip_threshold_change(struct LogicalDecodingContext *ctx) > { > static int changes_count = 0; /* used to accumulate the number of > * changes */ > > /* If the change was published, reset the counter and return false */ > if (ctx->did_write) > { > changes_count = 0; > return false; > } > > /* > * It is possible that the data is not sent to downstream for a long time > * either because the output plugin filtered it or there is a DDL that > * generates a lot of data that is not processed by the plugin. So, in > * such cases, the downstream can timeout. To avoid that we try to send a > * keepalive message if required. Trying to send a keepalive message > * after every change has some overhead, but testing showed there is no > * noticeable overhead if we do it after every ~100 changes. > */ > #define CHANGES_THRESHOLD 100 > if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD) > { > changes_count = 0; > return true; > } > > return false; > } > > ~ > > That 2nd condition checking if (!ctx->did_write && ++changes_count >= > CHANGES_THRESHOLD) does not seem right. There is no need to check the > ctx->did_write; it must be false because it was checked earlier in the > function: > > BEFORE > if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD) > > SUGGESTION1 > Assert(!ctx->did_write); > if (++changes_count >= CHANGES_THRESHOLD) > > SUGGESTION2 > if (++changes_count >= CHANGES_THRESHOLD) Fixed. I think the second suggestion looks better to me. > ~~~ > > 11. update_progress_and_keepalive > > /* > * Update progress tracking and send keep alive (if required). > */ > static void > update_progress_and_keepalive(struct LogicalDecodingContext *ctx, > bool finished_xact) > { > if (!ctx->update_progress_and_keepalive) > return; > > ctx->update_progress_and_keepalive(ctx, ctx->write_location, > ctx->write_xid, ctx->did_write, > finished_xact); > } > > ~ > > Maybe it's simpler to code this without the return. > > e.g. > > if (ctx->update_progress_and_keepalive) > { > ctx->update_progress_and_keepalive(ctx, ctx->write_location, > ctx->write_xid, ctx->did_write, > finished_xact); > } > > (it is just generic suggested code for example -- I made some other > review comments overlapping this) I think these two approaches are fine. But because I think the approach in the current patch is consistent with the style of other functions, I didn't change it. > ====== > .../replication/logical/reorderbuffer.c > > 12. ReorderBufferAbort > > + UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb- > >private_data, > + xid, lsn, !TransactionIdIsValid(txn->toplevel_xid)); > + > > I didn't really recognise how the > "!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean > 'finished_xact' param. Can this call have an explanatory comment about > how it works? It seems confusing to use txn->toplevel_xid to check whether it is top transaction. Because the comment of txn->toptxn shows the meaning of value, I updated the patch to use txn->toptxn to check this. > ====== > src/backend/replication/walsender.c > ~~~ > > 13. WalSndUpdateProgressAndKeepalive > > - if (pending_writes || (!end_xact && > + if (pending_writes || (!finished_xact && wal_sender_timeout > 0 && > now >= TimestampTzPlusMilliseconds(last_reply_timestamp, > wal_sender_timeout / 2))) > - ProcessPendingWrites(); > + WalSndSendPending(); > > Is this new function name OK to be WalSndSendPending? From this code, > we can see it can also be called in other scenarios even when there is > nothing "pending" at all. I think this function is used to flush pending data or send keepalive message. But I'm not sure if we should add keepalive related string to the function name, which seems to make this function name too long. Attach the new patch. Regards, Wang wei
Attachment
pgsql-hackers by date: