Thread: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
li jie
Date:
Hi, During logical decoding, if there is a large write transaction, some spill files will be written to disk, depending on the setting of max_changes_in_memory. This behavior can effectively avoid OOM, but if the transaction generates a lot of change before commit, a large number of files may fill the disk. For example, you can update a TB-level table. Of course, this is also inevitable. But I found an inelegant phenomenon. If the updated large table is not published, its changes will also be written with a large number of spill files. Look at an example below: publisher: ``` create table tbl_pub(id int, val1 text, val2 text,val3 text); create table tbl_t1(id int, val1 text, val2 text,val3 text); CREATE PUBLICATION mypub FOR TABLE public.tbl_pub; ``` subscriber: ``` create table tbl_pub(id int, val1 text, val2 text,val3 text); create table tbl_t1(id int, val1 text, val2 text,val3 text); CREATE SUBSCRIPTION mysub CONNECTION 'host=127.0.0.1 port=5432 user=postgres dbname=postgres' PUBLICATION mypub; ``` publisher: ``` begin; insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba', i),repeat('dfds', i) from generate_series(0,999999) i; ``` Later you will see a large number of spill files in the "/$PGDATA/pg_replslot/mysub/" directory. ``` $ll -sh total 4.5G 4.0K -rw------- 1 postgres postgres 200 Nov 30 09:24 state 17M -rw------- 1 postgres postgres 17M Nov 30 08:22 xid-750-lsn-0-10000000.spill 12M -rw------- 1 postgres postgres 12M Nov 30 08:20 xid-750-lsn-0-1000000.spill 17M -rw------- 1 postgres postgres 17M Nov 30 08:23 xid-750-lsn-0-11000000.spill ...... ``` We can see that table tbl_t1 is not published in mypub. It is also not sent downstream because it is subscribed. After the transaction is reorganized, the pgoutput decoding plug-in filters out these change of unpublished relation when sending logical changes. see function pgoutput_change. Above all, if after constructing a change and before queuing a change into a transaction, we filter out unpublished relation-related changes, will it make logical decoding less laborious and avoid disk growth as much as possible? This is just an immature idea. I haven't started to implement it yet. Maybe it was designed this way because there are key factors that I didn't consider. So I want to hear everyone's opinions, especially the designers of logic decoding.
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
li jie
Date:
> This is just an immature idea. I haven't started to implement it yet. > Maybe it was designed this way because there > are key factors that I didn't consider. So I want to hear everyone's > opinions, especially the designers of logic decoding. Attached is the patch I used to implement this optimization. The main designs are as follows: 1. Added a callback LogicalDecodeFilterByRelCB for the output plugin. 2. Added this callback function pgoutput_table_filter for the pgoutput plugin. Its main implementation is based on the table filter in the pgoutput_change function. 3. After constructing a change and before Queue a change into a transaction, use RelidByRelfilenumber to obtain the relation associated with the change, just like obtaining the relation in the ReorderBufferProcessTXN function. 4. Relation may be a toast, and there is no good way to get its real table relation based on toast relation. Here, I get the real table oid through toast relname, and then get the real table relation. 5. This filtering takes into account INSERT/UPDATE/INSERT. Other changes have not been considered yet and can be expanded in the future. 6. The table filter in pgoutput_change and the get relation in ReorderBufferProcessTXN can be deleted. This has not been done yet. This is the next step. Sincerely look forward to your feedback. Regards, lijie
Attachment
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
Amit Kapila
Date:
On Fri, Dec 1, 2023 at 1:55 PM li jie <ggysxcq@gmail.com> wrote: > > > This is just an immature idea. I haven't started to implement it yet. > > Maybe it was designed this way because there > > are key factors that I didn't consider. So I want to hear everyone's > > opinions, especially the designers of logic decoding. > > Attached is the patch I used to implement this optimization. > The main designs are as follows: > 1. Added a callback LogicalDecodeFilterByRelCB for the output plugin. > > 2. Added this callback function pgoutput_table_filter for the pgoutput plugin. > Its main implementation is based on the table filter in the > pgoutput_change function. > > 3. After constructing a change and before Queue a change into a transaction, > use RelidByRelfilenumber to obtain the relation associated with the change, > just like obtaining the relation in the ReorderBufferProcessTXN function. > > 4. Relation may be a toast, and there is no good way to get its real > table relation > based on toast relation. Here, I get the real table oid through > toast relname, and > then get the real table relation. > This may be helpful for the case you have mentioned but how about cases where there is nothing to filter by relation? It will add overhead related to the transaction start/end and others for each change. Currently, we do that just once for all the changes that need to be processed. I wonder why the spilling can't be avoided with GUC logical_decoding_work_mem? -- With Regards, Amit Kapila.
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
li jie
Date:
> > This may be helpful for the case you have mentioned but how about > cases where there is nothing to filter by relation? You can see the complete antecedent in the email [1]. Relation that has not been published will also generate changes and put them into the entire transaction group, which will increase invalid memory or disk space. > It will add > overhead related to the transaction start/end and others for each > change. Currently, we do that just once for all the changes that need > to be processed. Yes, it will only be processed once at present. It is done when applying each change when the transaction is committed. This patch hopes to advance it to the time when constructing the change, and determines the change queue into a based on whether the relation is published. > I wonder why the spilling can't be avoided with GUC > logical_decoding_work_mem? Of course you can, but this will only convert disk space into memory space. For details, please see the case in Email [1]. Regards, lijie
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
li jie
Date:
> This may be helpful for the case you have mentioned but how about > cases where there is nothing to filter by relation? You can see the complete antecedent in the email [1]. Relation that has not been published will also generate changes and put them into the entire transaction group, which will increase invalid memory or disk space. > It will add > overhead related to the transaction start/end and others for each > change. Currently, we do that just once for all the changes that need > to be processed. Yes, it will only be processed once at present. It is done when applying each change when the transaction is committed. This patch hopes to advance it to the time when constructing the change, and determines the change queue into a based on whether the relation is published. > I wonder why the spilling can't be avoided with GUC > logical_decoding_work_mem? Of course you can, but this will only convert disk space into memory space. For details, please see the case in Email [1]. [1] https://www.postgresql.org/message-id/CAGfChW51P944nM5h0HTV9HistvVfwBxNaMt_s-OZ9t%3DuXz%2BZbg%40mail.gmail.com Regards, lijie Amit Kapila <amit.kapila16@gmail.com> 于2023年12月2日周六 12:11写道: > > On Fri, Dec 1, 2023 at 1:55 PM li jie <ggysxcq@gmail.com> wrote: > > > > > This is just an immature idea. I haven't started to implement it yet. > > > Maybe it was designed this way because there > > > are key factors that I didn't consider. So I want to hear everyone's > > > opinions, especially the designers of logic decoding. > > > > Attached is the patch I used to implement this optimization. > > The main designs are as follows: > > 1. Added a callback LogicalDecodeFilterByRelCB for the output plugin. > > > > 2. Added this callback function pgoutput_table_filter for the pgoutput plugin. > > Its main implementation is based on the table filter in the > > pgoutput_change function. > > > > 3. After constructing a change and before Queue a change into a transaction, > > use RelidByRelfilenumber to obtain the relation associated with the change, > > just like obtaining the relation in the ReorderBufferProcessTXN function. > > > > 4. Relation may be a toast, and there is no good way to get its real > > table relation > > based on toast relation. Here, I get the real table oid through > > toast relname, and > > then get the real table relation. > > > > This may be helpful for the case you have mentioned but how about > cases where there is nothing to filter by relation? It will add > overhead related to the transaction start/end and others for each > change. Currently, we do that just once for all the changes that need > to be processed. I wonder why the spilling can't be avoided with GUC > logical_decoding_work_mem? > > -- > With Regards, > Amit Kapila.
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
Ajin Cherian
Date:
Of course you can, but this will only convert disk space into memory space.
For details, please see the case in Email [1].
[1] https://www.postgresql.org/message-id/CAGfChW51P944nM5h0HTV9HistvVfwBxNaMt_s-OZ9t%3DuXz%2BZbg%40mail.gmail.com
Regards, lijie
Hi lijie,
Overall, I think the patch is a good improvement. Some comments from first run through of patch:
1. The patch no longer applies cleanly, please rebase.
2. While testing the patch, I saw something strange. If I try to truncate a table that is published. I still see the message:
2024-03-18 22:25:51.243 EDT [29385] LOG: logical filter change by table pg_class
2024-03-18 22:25:51.243 EDT [29385] LOG: logical filter change by table pg_class
This gives the impression that the truncate operation on the published table has been filtered but it hasn't. Also the log message needs to be reworded. Maybe, "Logical filtering change by non-published table <relation_name>"
3. Below code:
@@ -1201,11 +1343,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+
+ if (FilterByTable(ctx, change))
+ continue;;
@@ -1201,11 +1343,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+
+ if (FilterByTable(ctx, change))
+ continue;;
extra semi-colon after continue.
4. I am not sure if this is possible, but is there a way to avoid the overhead in the patch if the publication publishes "ALL TABLES"?
5. In function: pgoutput_table_filter() - this code appears to be filtering out not just unpublished tables but also applying row based filters on published tables as well. Is this really within the scope of the feature?
regards,
Ajin Cherian
Fujitsu Australia
RE: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Li, Thanks for proposing and sharing the PoC. Here are my high-level comments. 1. What if ALTER PUBLICATION ... ADD is executed in parallel? Should we publish added tables if the altering is done before the transaction is committed? The current patch seems unable to do so because changes for added tables have not been queued at COMMIT. If we should not publish such tables, why? 2. This patch could not apply as-is. Please rebase. 3. FilterByTable() ``` + if (ctx->callbacks.filter_by_origin_cb == NULL) + return false; ``` filter_by_table_cb should be checked instead of filter_by_origin_cb. Current patch crashes if the filter_by_table_cb() is not implemented. 4. DecodeSpecConfirm() ``` + if (FilterByTable(ctx, change)) + return; + ``` I'm not sure it is needed. Can you explain the reason why you added? 5. FilterByTable ``` + switch (change->action) + { + /* intentionally fall through */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + break; + default: + return false; + } ``` IIUC, REORDER_BUFFER_CHANGE_TRUNCATE also targes the user table, so I think it should be accepted. Thought? 6. I got strange errors when I tested the feature. I thought this implied there were bugs in your patch. 1. implemented no-op filter atop test_decoding like attached 2. ran `make check` for test_decoding modle 3. some tests failed. Note that "filter" was a test added by me. regression.diffs was also attached. ``` not ok 1 - ddl 970 ms ok 2 - xact 36 ms not ok 3 - rewrite 525 ms not ok 4 - toast 736 ms ok 5 - permissions 50 ms ok 6 - decoding_in_xact 39 ms not ok 7 - decoding_into_rel 57 ms ok 8 - binary 21 ms not ok 9 - prepared 33 ms ok 10 - replorigin 93 ms ok 11 - time 25 ms ok 12 - messages 47 ms ok 13 - spill 8063 ms ok 14 - slot 124 ms ok 15 - truncate 37 ms not ok 16 - stream 60 ms ok 17 - stats 157 ms ok 18 - twophase 122 ms not ok 19 - twophase_stream 57 ms not ok 20 - filter 20 ms ``` Currently I'm not 100% sure the reason, but I think it may read the latest system catalog even if ALTER SUBSCRIPTION is executed after changes. In below example, an attribute is altered text->somenum, after inserting data. But get_changes() outputs as somenum. ``` BEGIN - table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:'1' - table public.replication_example: INSERT: id[integer]:2 somedata[integer]:1 text[character varying]:'2' + table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 somenum[character varying]:'1' + table public.replication_example: INSERT: id[integer]:2 somedata[integer]:1 somenum[character varying]:'2' COMMIT ``` Also, if the relfilenuber of the relation is changed, an ERROR is raised. ``` SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - data ----------------------------------------------------------------------------- - BEGIN - table public.tr_pkey: INSERT: id2[integer]:2 data[integer]:1 id[integer]:2 - COMMIT - BEGIN - table public.tr_pkey: DELETE: id[integer]:1 - table public.tr_pkey: DELETE: id[integer]:2 - COMMIT -(7 rows) - +ERROR: could not map filenumber "base/16384/16397" to relation OID ``` Best Regards, Hayato Kuroda FUJITSU LIMITED https://www.fujitsu.com/
Attachment
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
Ajin Cherian
Date:
On Tue, Mar 19, 2024 at 1:49 PM Ajin Cherian <itsajin@gmail.com> wrote:
Of course you can, but this will only convert disk space into memory space.
For details, please see the case in Email [1].
[1] https://www.postgresql.org/message-id/CAGfChW51P944nM5h0HTV9HistvVfwBxNaMt_s-OZ9t%3DuXz%2BZbg%40mail.gmail.com
Regards, lijie
In some testing, I see a crash:
(gdb) bt
#0 0x00007fa5bcbfd277 in raise () from /lib64/libc.so.6
#1 0x00007fa5bcbfe968 in abort () from /lib64/libc.so.6
#2 0x00000000009e0940 in ExceptionalCondition (
conditionName=conditionName@entry=0x7fa5ab8b9842 "RelationSyncCache != NULL",
fileName=fileName@entry=0x7fa5ab8b9820 "pgoutput.c", lineNumber=lineNumber@entry=1991)
at assert.c:66
#3 0x00007fa5ab8b7804 in get_rel_sync_entry (data=data@entry=0x2492288,
relation=relation@entry=0x7fa5be30a768) at pgoutput.c:1991
#4 0x00007fa5ab8b7cda in pgoutput_table_filter (ctx=<optimized out>, relation=0x7fa5be30a768,
change=0x24c5c20) at pgoutput.c:1671
#5 0x0000000000813761 in filter_by_table_cb_wrapper (ctx=ctx@entry=0x2491fd0,
relation=relation@entry=0x7fa5be30a768, change=change@entry=0x24c5c20) at logical.c:1268
#6 0x000000000080e20f in FilterByTable (ctx=ctx@entry=0x2491fd0, change=change@entry=0x24c5c20)
at decode.c:690
#7 0x000000000080e8e3 in DecodeInsert (ctx=ctx@entry=0x2491fd0, buf=buf@entry=0x7fff0db92550)
at decode.c:1070
#8 0x000000000080f43d in heap_decode (ctx=ctx@entry=0x2491fd0, buf=buf@entry=0x7fff0db92550)
at decode.c:485
#9 0x000000000080eca6 in LogicalDecodingProcessRecord (ctx=ctx@entry=0x2491fd0, record=0x2492368)
at decode.c:118
#10 0x000000000081338f in DecodingContextFindStartpoint (ctx=ctx@entry=0x2491fd0) at logical.c:672
#11 0x000000000083c650 in CreateReplicationSlot (cmd=cmd@entry=0x2490970) at walsender.c:1323
#12 0x000000000083fd48 in exec_replication_command (
cmd_string=cmd_string@entry=0x239c880 "CREATE_REPLICATION_SLOT \"pg_16387_sync_16384_7371301304766135621\" LOGICAL pgoutput (SNAPSHOT 'use')") at walsender.c:2116
(gdb) bt
#0 0x00007fa5bcbfd277 in raise () from /lib64/libc.so.6
#1 0x00007fa5bcbfe968 in abort () from /lib64/libc.so.6
#2 0x00000000009e0940 in ExceptionalCondition (
conditionName=conditionName@entry=0x7fa5ab8b9842 "RelationSyncCache != NULL",
fileName=fileName@entry=0x7fa5ab8b9820 "pgoutput.c", lineNumber=lineNumber@entry=1991)
at assert.c:66
#3 0x00007fa5ab8b7804 in get_rel_sync_entry (data=data@entry=0x2492288,
relation=relation@entry=0x7fa5be30a768) at pgoutput.c:1991
#4 0x00007fa5ab8b7cda in pgoutput_table_filter (ctx=<optimized out>, relation=0x7fa5be30a768,
change=0x24c5c20) at pgoutput.c:1671
#5 0x0000000000813761 in filter_by_table_cb_wrapper (ctx=ctx@entry=0x2491fd0,
relation=relation@entry=0x7fa5be30a768, change=change@entry=0x24c5c20) at logical.c:1268
#6 0x000000000080e20f in FilterByTable (ctx=ctx@entry=0x2491fd0, change=change@entry=0x24c5c20)
at decode.c:690
#7 0x000000000080e8e3 in DecodeInsert (ctx=ctx@entry=0x2491fd0, buf=buf@entry=0x7fff0db92550)
at decode.c:1070
#8 0x000000000080f43d in heap_decode (ctx=ctx@entry=0x2491fd0, buf=buf@entry=0x7fff0db92550)
at decode.c:485
#9 0x000000000080eca6 in LogicalDecodingProcessRecord (ctx=ctx@entry=0x2491fd0, record=0x2492368)
at decode.c:118
#10 0x000000000081338f in DecodingContextFindStartpoint (ctx=ctx@entry=0x2491fd0) at logical.c:672
#11 0x000000000083c650 in CreateReplicationSlot (cmd=cmd@entry=0x2490970) at walsender.c:1323
#12 0x000000000083fd48 in exec_replication_command (
cmd_string=cmd_string@entry=0x239c880 "CREATE_REPLICATION_SLOT \"pg_16387_sync_16384_7371301304766135621\" LOGICAL pgoutput (SNAPSHOT 'use')") at walsender.c:2116
The reason for the crash is that the RelationSyncCache was NULL prior to reaching a consistent point.
Hi li jie, I see that you created a new thread with an updated version of this patch [1]. I used that patch and addressed the crash seen above, rebased the patch and addressed a few other comments.
Hi li jie, I see that you created a new thread with an updated version of this patch [1]. I used that patch and addressed the crash seen above, rebased the patch and addressed a few other comments.
I'm happy to help you with this patch and address comments if you are not available.
regards,
Ajin Cherian
Fujitsu Australia
Attachment
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
Ajin Cherian
Date:
On Wed, May 22, 2024 at 2:17 PM Ajin Cherian <itsajin@gmail.com> wrote:
The reason for the crash is that the RelationSyncCache was NULL prior to reaching a consistent point.
Hi li jie, I see that you created a new thread with an updated version of this patch [1]. I used that patch and addressed the crash seen above, rebased the patch and addressed a few other comments.I'm happy to help you with this patch and address comments if you are not available.regards,Ajin CherianFujitsu Australia
I was discussing this with Kuroda-san who made a patch to add a table filter with test_decoding plugin. The filter does nothing, just returns false( doesn't filter anything) and I see that 8 test_decoding tests fail. In my analysis, I could see that some of the failures were because the new filter logic was accessing the relation cache using the latest snapshot for relids which was getting incorrect relation information while decoding attribute values.
for eg:
CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
BEGIN;
INSERT INTO replication_example(somedata, text) VALUES (1, 1);
INSERT INTO replication_example(somedata, text) VALUES (1, 2);
COMMIT;
ALTER TABLE replication_example RENAME COLUMN text TO somenum;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
Here, after decoding, the changes for the INSERT, were reflecting the new column name (somenum) which was altered in a later transaction. This is because the new Filterby Table() logic was getting relcache with latest snapshot and does not use a historic snapshot like logical decoding should be doing. This is because the changes are at the decode.c level and not the reorderbuffer level and does not have access to the txn from the reorderbuffer. This problem could be fixed by invalidating the cache in the FilterByTable() logic, but this does not solve other problems like the table name itself is changed in a later transaction. I think the patch has a fundamental problem that the table filter logic does not respect the snapshot of the transaction being decoded. The historic snapshot is currently only set up when the actual changes are committed or streamed at ReorderBufferProcessTXN().
If the purpose of the patch is to filter out unnecessary changes prior to actual decode, then it will use an invalid snapshot and have lots of problems. Otherwise this logic has to be moved to the reorderbuffer level and there will be a big overhead of extracting reorderbuffer while each change is queued in memory/disk.
for eg:
CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
BEGIN;
INSERT INTO replication_example(somedata, text) VALUES (1, 1);
INSERT INTO replication_example(somedata, text) VALUES (1, 2);
COMMIT;
ALTER TABLE replication_example RENAME COLUMN text TO somenum;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
Here, after decoding, the changes for the INSERT, were reflecting the new column name (somenum) which was altered in a later transaction. This is because the new Filterby Table() logic was getting relcache with latest snapshot and does not use a historic snapshot like logical decoding should be doing. This is because the changes are at the decode.c level and not the reorderbuffer level and does not have access to the txn from the reorderbuffer. This problem could be fixed by invalidating the cache in the FilterByTable() logic, but this does not solve other problems like the table name itself is changed in a later transaction. I think the patch has a fundamental problem that the table filter logic does not respect the snapshot of the transaction being decoded. The historic snapshot is currently only set up when the actual changes are committed or streamed at ReorderBufferProcessTXN().
If the purpose of the patch is to filter out unnecessary changes prior to actual decode, then it will use an invalid snapshot and have lots of problems. Otherwise this logic has to be moved to the reorderbuffer level and there will be a big overhead of extracting reorderbuffer while each change is queued in memory/disk.
regards,
Ajin Cherian
Fujitsu Australia
Re: Proposal: Filter irrelevant change before reassemble transactions during logical decoding
From
Peter Smith
Date:
Hi Ajin, Some review comments for patch v12-0001. ====== Commit message 1. Track transactions which have snapshot changes with a new flag RBTXN_HAS_SNAPSHOT_CHANGES ~ The commit message only says *what* it does, but not *why* this patch even exists. TBH, I don't understand why this patch needs to be separated from your patch 0002, because 0001 makes no independent use of the flag, nor is it separately tested. Anyway, if it is going to remain separated then IMO at least the the message should explain the intended purpose e.g. why the subsequent patches require this flagged info and how they will use it. ====== src/include/replication/reorderbuffer.h 2. +/* Does this transaction have snapshot changes? */ +#define rbtxn_has_snapshot_changes(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_SNAPSHOT_CHANGES) != 0 \ +) + Is the below wording maybe a more plain way to say that: /* Does this transaction make changes to the current snapshot? */ ====== Kind Regards, Peter Smith. Fujitsu Australia