Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding - Mailing list pgsql-hackers

From Ajin Cherian
Subject Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
Date
Msg-id CAFPTHDac7PodARv1S5rBEJB-MGDjjDovjKMV_gypDj3_4wnzpA@mail.gmail.com
Whole thread Raw
In response to RE: Proposal: Filter irrelevant change before reassemble transactions during logical decoding  ("Hayato Kuroda (Fujitsu)" <kuroda.hayato@fujitsu.com>)
List pgsql-hackers
On Thu, Feb 20, 2025 at 8:42 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
>
> Dear Ajin,
>
> Here are my comments. I must play with patches to understand more.
>
> 01.
> ```
> extern bool ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
>                                                                                                 XLogRecPtr lsn,
RelFileLocator*rlocator, 
>
ReorderBufferChangeTypechange_type, 
>                                                                                                 bool has_tuple);
> ```
>
> Can you explain why "has_tuple is needed? All callers is set to true.
>

It is not needed. Removed.


> 02.
> ```
> static Relation
> ReorderBufferGetRelation(ReorderBuffer *rb, RelFileLocator *rlocator,
>                                                  bool has_tuple)
> ```
>
> Hmm, the naming is bit confusing for me. This operation is mostly not related with
> the reorder buffer. How about "GetPossibleDecodableRelation" or something?
>
> 03.
> ```
>                 if (IsToastRelation(relation))
>                 {
>                         Oid     real_reloid = InvalidOid;
>                         char   *toast_name = RelationGetRelationName(relation);
>                         /* pg_toast_ len is 9 */
>                         char   *start_ch = &toast_name[9];
>
>                         real_reloid = pg_strtoint32(start_ch);
>                         entry->relid = real_reloid;
>                 }
> ```
>
> It is bit hacky for me. How about using sscanf like attached?
>

I have used the code that you shared to modify this.


> 04.
>
> IIUC, toast tables always require the filter_change() call twice, is it right?
> I understood like below:
>
> 1. ReorderBufferFilterByRelFileLocator() tries to filter the change at outside the
>    transaction. The OID indicates the pg_toast_xxx table.
> 2. pgoutput_filter_change() cannot find the table from the hash. It returns false
>    with cache_valid=false.
> 3. ReorderBufferFilterByRelFileLocator() starts a transaction and get its relation.
> 4. The function recognizes the relation seems toast and get parent oid.
> 5. The function tries to filter the change in the transaction, with the parent oid.
> 6. pgoutput_filter_change()->get_rel_sync_entry() enters the parent relation to the
>    hash and return determine the filtable or not.
> 7. After sometime, the same table is modified. But the toast table is not stored in
>    the hash so that whole 1-6 steps are required.
>
> I feel this may affect the perfomance when many toast is modified. How about skiping
> the check for toasted ones? ISTM IsToastNamespace() can be used for the decision.
>

I understand your concern but I also think that the benefits are
higher for toast changes

On Fri, Feb 21, 2025 at 6:26 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Hi Ajin,
>
> Some review comments for patch v14-0001.
>
> ======
> src/backend/replication/logical/reorderbuffer.c
>
> 1.
> + *   We also try and filter changes that are not relevant for logical decoding
> + *   as well as give the option for plugins to filter changes in advance.
> + *   Determining whether to filter a change requires information about the
> + *   relation from the catalog, requring a transaction to be started.
> + *   When most changes in a transaction are unfilterable, the overhead of
> + *    starting a transaction for each record is significant. To reduce this
> + *    overhead a hash cache of relation file locators is created. Even then a
> + *    hash search for every record before recording has considerable overhead
> + *    especially for use cases where most tables in an instance are
> not filtered.
> + *    To further reduce this overhead a simple approach is used to suspend
> + *    filtering for a certain number of changes CHANGES_THRESHOLD_FOR_FILTER
> + *    when an unfilterable change is encountered. In other words, continue
> + *    filtering changes if the last record was filtered out. If an unfilterable
> + *    change is found, skip filtering the next CHANGES_THRESHOLD_FOR_FILTER
> + *    changes.
> + *
>
> 1a.
> /try and filter/try to filter/
>
> ~
>
> 1b.
> There is some leading whitespace problem happening (spaces instead of tabs?)
>
> ~
>
> 1c.
> Minor rewording
>
> SUGGESTION (e.g. anyway this should be identical to the commit message text)
>
> Determining whether to filter a change requires information about the
> relation and the publication from the catalog, which means a
> transaction must be started. But, the overhead of starting a
> transaction for each record is significant. To reduce this overhead a
> hash cache of relation file locators is used to remember which
> relations are filterable.
>
> Even so, doing a hash search for every record has considerable
> overhead, especially for scenarios where most tables in an instance
> are published. To further reduce overheads a simple approach is used:
> When an unfilterable change is encountered we suspend filtering for a
> certain number (CHANGES_THRESHOLD_FOR_FILTER) of subsequent changes.
> In other words, continue filtering until an unfilterable change is
> encountered; then skip filtering the next CHANGES_THRESHOLD_FOR_FILTER
> changes, before attempting filtering again.
>
> ~~~
>

I have modified the whole commit message and these comments based on
inputs from Hou-san.

> 2.
> +/*
> + * After encountering a change that cannot be filtered out, filtering is
> + * temporarily suspended. Filtering resumes after processing every 100 changes.
> + * This strategy helps to minimize the overhead of performing a hash table
> + * search for each record, especially when most changes are not filterable.
> + */
> +#define CHANGES_THRESHOLD_FOR_FILTER 100
>
> Maybe you can explain where this magic number comes from.
>
> SUGGESTION
> The CHANGES_THRESHOLD_FOR_FILTER value of 100 was chosen as the best
> trade-off value after performance tests were carried out using
> candidate values 10, 50, 100, and 200.
>

I am not so sure as my test scenarios where very limited, I don't
think that I can say that 100 is the best value for all scenarios.
Which is why i suggest changing this as a user changable GUC variable
if others agree.

> ~~~
>
> ReorderBufferQueueChange:
>
> 3.
> + /*
> + * If filtering was suspended and we've crossed the change threshold,
> + * attempt to filter again
> + */
> + if (!rb->can_filter_change && (++rb->unfiltered_changes_count
> + >= CHANGES_THRESHOLD_FOR_FILTER))
> + {
> + rb->can_filter_change = true;
> + rb->unfiltered_changes_count = 0;
> + }
> +
>
> /If filtering was suspended/If filtering is currently suspended/
>
> ~~~

Fixed.

>
> ReorderBufferGetRelation:
>
> 4.
> +static Relation
> +ReorderBufferGetRelation(ReorderBuffer *rb, RelFileLocator *rlocator,
> + bool has_tuple)
>
> My suggested [2-#4] name change 'ReorderBufferGetRelationForDecoding'
> is not done yet. I saw Kuroda-san also said this name was confusing
> [1-#02], and suggested something similar
> 'GetPossibleDecodableRelation'.
>

Changed it to GetDecodableRelation()

> ~~~
>
> RelFileLocatorCacheInvalidateCallback:
>
> 5.
> + /*
> + * If relid is InvalidOid, signaling a complete reset, we must remove
> + * all entries, otherwise just remove the specific relation's entry.
> + * Always remove negative cache entries.
> + */
> + if (relid == InvalidOid || /* complete reset */
> + entry->relid == InvalidOid || /* invalid cache entry */
> + entry->relid == relid) /* individual flushed relation */
> + {
> + if (hash_search(RelFileLocatorFilterCache,
> + &entry->key,
> + HASH_REMOVE,
> + NULL) == NULL)
> + elog(ERROR, "hash table corrupted");
> + }
>
>
> 5a.
> IMO the relid *parameter* should be mentioned explicitly to
> disambiguate relid from the entry->relid.
>
> /If relid is InvalidOid, signaling a complete reset,/If a complete
> reset is requested (when 'relid' parameter is InvalidOid),/
>
> ~
>
> 5b.
> /Always remove negative cache entries./Remove any invalid cache
> entries (these are indicated by invalid entry->relid)/
>
> ~~~

Fixed

>
> ReorderBufferFilterByRelFileLocator:
>
> 6.
> I previously [2-#7] had suggested this function code could be
> refactored to share some common return logic. It is not done, but OTOH
> there is no reply, so I don't know if it was overlooked or simply
> rejected.
>

I had considered this but I felt that it is better readable this way.

> ======
> src/include/replication/reorderbuffer.h
>
> 7.
> + /* should we try to filter the change? */
> + bool can_filter_change;
> +
> + /* number of changes after a failed attempt at filtering */
> + int8 unfiltered_changes_count;
> +
>
> The potential renaming of that 'can_filter_change' field to something
> better is still an open item IMO [2-#8] pending consensus on what a
> better name for this might be.
>
I haven't changed this.


On Sun, Feb 23, 2025 at 2:19 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Hi Ajin,
>
> Some review comments for v14-0002.
>
> ======
> src/backend/replication/logical/decode.c
>
> 1.
> There is lots of nearly duplicate code checking to see if a change is filterable
>
> DecodeInsert:
> + /*
> + * When filtering changes, determine if the relation associated with the change
> + * can be skipped. This could be because the relation is unlogged or because
> + * the plugin has opted to exclude this relation from decoding.
> + */
> + if (FilterChangeIsEnabled(ctx) &&
>   ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
> - buf->origptr, &target_locator, true))
> + buf->origptr, &target_locator,
> + REORDER_BUFFER_CHANGE_INSERT,
> + true))
>
>
> DecodeUpdate:
> + /*
> + * When filtering changes, determine if the relation associated with the change
> + * can be skipped. This could be because the relation is unlogged or because
> + * the plugin has opted to exclude this relation from decoding.
> + */
> + if (FilterChangeIsEnabled(ctx) &&
> + ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
> + buf->origptr, &target_locator,
> + REORDER_BUFFER_CHANGE_UPDATE,
> + true))
> + return;
> +
>
> DecodeDelete:
> + /*
> + * When filtering changes, determine if the relation associated with the change
> + * can be skipped. This could be because the relation is unlogged or because
> + * the plugin has opted to exclude this relation from decoding.
> + */
> + if (FilterChangeIsEnabled(ctx) &&
> + ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
> + buf->origptr, &target_locator,
> + REORDER_BUFFER_CHANGE_DELETE,
> + true))
> + return;
> +
>
> DecodeMultiInsert:
>   /*
> + * When filtering changes, determine if the relation associated with the change
> + * can be skipped. This could be because the relation is unlogged or because
> + * the plugin has opted to exclude this relation from decoding.
> + */
> + if (FilterChangeIsEnabled(ctx) &&
> + ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
> + buf->origptr, &rlocator,
> + REORDER_BUFFER_CHANGE_INSERT,
> + true))
> + return;
> +
>
> DecodeSpecConfirm:
> + /*
> + * When filtering changes, determine if the relation associated with the change
> + * can be skipped. This could be because the relation is unlogged or because
> + * the plugin has opted to exclude this relation from decoding.
> + */
> + if (FilterChangeIsEnabled(ctx) &&
> + ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
> + buf->origptr, &target_locator,
> + REORDER_BUFFER_CHANGE_INSERT,
> + true))
> + return;
> +
>
> Can't all those code fragments (DecodeInsert, DecodeUpdate,
> DecodeDelete, DecodeMultiInsert, DecodeSpecConfirm) delegate to a
> new/common 'SkipThisChange(...)' function?
>

Fixed this accordingly.

> ======
> src/backend/replication/logical/reorderbuffer.c
>
> 2.
>  /*
>   * After encountering a change that cannot be filtered out, filtering is
> - * temporarily suspended. Filtering resumes after processing every 100 changes.
> + * temporarily suspended. Filtering resumes after processing
> CHANGES_THRESHOLD_FOR_FILTER
> + * changes.
>   * This strategy helps to minimize the overhead of performing a hash table
>   * search for each record, especially when most changes are not filterable.
>   */
> -#define CHANGES_THRESHOLD_FOR_FILTER 100
> +#define CHANGES_THRESHOLD_FOR_FILTER 0
>
> Why is this defined as 0? Some accidental residue from performance
> testing different values?
>

I need it to remain 0 for my tests to work. I have mooved out these
changes and tests to a new patch 0003.

> ======
> src/test/subscription/t/001_rep_changes.pl
>
> 3.
> +# Check that an unpublished change is filtered out.
> +$logfile = slurp_file($node_publisher->logfile, $log_location);
> +ok($logfile =~ qr/Filtering INSERT/,
> + 'unpublished INSERT is filtered');
> +
> +ok($logfile =~ qr/Filtering UPDATE/,
> + 'unpublished UPDATE is filtered');
> +
> +ok($logfile =~ qr/Filtering DELETE/,
> + 'unpublished DELETE is filtered');
> +
>
> AFAICT these are probably getting filtered out because the entire
> table is not published at all.
>
> Should you also add different tests where you do operations on a table
> that IS partially replicated but only some of the *operations* are not
> published. e.g. test the different 'pubactions' of the PUBLICATION
> 'publish' parameter. Maybe you need different log checks to
> distinguish the different causes for the filtering.
>

I've modified the commit message in 0001 based on inputs from Hou-san.
Modified and added new tests in patch 0003

regards,
Ajin Cherian
Fujitsu Australia

Attachment

pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: Re: SQL:2011 application time
Next
From: Peter Eisentraut
Date:
Subject: Re: Use Python "Limited API" in PL/Python