Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers
From | Amit Kapila |
---|---|
Subject | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
Date | |
Msg-id | CAA4eK1LT2UArE9FyPqZcmV2kALhEmcXnZGdreYWCdCJ9vRUd8Q@mail.gmail.com Whole thread Raw |
In response to | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions (Dilip Kumar <dilipbalaut@gmail.com>) |
Responses |
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
List | pgsql-hackers |
On Wed, Feb 5, 2020 at 9:46 AM Dilip Kumar <dilipbalaut@gmail.com> wrote: > > Fixed in the latest version sent upthread. > Okay, thanks. I haven't looked at the latest version of patch series as I was reviewing the previous version and I think all of these comments are in the patch which is not modified. Here are my comments: I think we don't need to maintain v8-0007-Support-logical_decoding_work_mem-set-from-create as per discussion in one of the above emails [1] as its usage is not clear. v8-0008-Add-support-for-streaming-to-built-in-replication 1. - information. The allowed options are <literal>slot_name</literal> and - <literal>synchronous_commit</literal> + information. The allowed options are <literal>slot_name</literal>, + <literal>synchronous_commit</literal>, <literal>work_mem</literal> + and <literal>streaming</literal>. As per the discussion above [1], I don't think we need work_mem here. You might want to remove the other usage from the patch as well. 2. @@ -59,7 +59,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, int *logical_wm, - bool *logical_wm_given) + bool *logical_wm_given, bool *streaming, + bool *streaming_given) It is not clear to me why we need two parameters 'streaming' and 'streaming_given' in this API. Can't we handle similar to parameter 'refresh'? 3. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index aec885e..e80d00c 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -14,6 +14,8 @@ * *------------------------------------------------------------------------- */ +#include <sys/types.h> +#include <unistd.h> #include "postgres.h" I see only the above change in launcher.c. Why we need to include these if there is no other change (at least not in this patch). 4. stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "stream_start"; - /* state.report_location = apply_lsn; */ + state.report_location = InvalidXLogRecPtr; errcallback.callback = output_plugin_error_callback; errcallback.arg = (void *) &state; errcallback.previous = error_context_stack; @@ -1194,7 +1194,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "stream_stop"; - /* state.report_location = apply_lsn; */ + state.report_location = InvalidXLogRecPtr; errcallback.callback = output_plugin_error_callback; errcallback.arg = (void *) &state; errcallback.previous = error_context_stack; Don't we want to set txn->final_lsn in report location as we do at few other places? 5. -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) +logicalrep_write_delete(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple) { + pq_sendbyte(out, 'D'); /* action DELETE */ + Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, 'D'); /* action DELETE */ Why this patch need to change the above code? 6. +void +logicalrep_write_stream_start(StringInfo out, + TransactionId xid, bool first_segment) +{ + pq_sendbyte(out, 'S'); /* action STREAM START */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); + + /* 1 if this is the first streaming segment for this xid */ + pq_sendint32(out, first_segment ? 1 : 0); +} + +TransactionId +logicalrep_read_stream_start(StringInfo in, bool *first_segment) +{ + TransactionId xid; + + Assert(first_segment); + + xid = pq_getmsgint(in, 4); + *first_segment = (pq_getmsgint(in, 4) == 1); + + return xid; +} In these functions for sending bool, pq_sendint32 is used. Can't we use pq_sendbyte similar to what we do in boolsend? 7. +void +logicalrep_write_stream_stop(StringInfo out, TransactionId xid) +{ + pq_sendbyte(out, 'E'); /* action STREAM END */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); +} In comments, 'starting to stream' is mentioned whereas this function is to stop it. 8. +void +logicalrep_write_stream_stop(StringInfo out, TransactionId xid) +{ + pq_sendbyte(out, 'E'); /* action STREAM END */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); +} + +TransactionId +logicalrep_read_stream_stop(StringInfo in) +{ + TransactionId xid; + + xid = pq_getmsgint(in, 4); + + return xid; +} Is there a reason to send xid on stopping stream? I don't see any use of function logicalrep_read_stream_stop. 9. + * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end. + */ +static void +subxact_info_write(Oid subid, TransactionId xid) { .. + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_WRITE); .. + pgstat_report_wait_end(); .. } I see the calls to pgstat_report_wait_start/pgstat_report_wait_end in this function, so not sure if the above comment makes sense. 10. + * The files are placed in /tmp by default, and the filenames include both + * the XID of the toplevel transaction and OID of the subscription. Are we keeping files in /tmp or pg's temp tablespace dir. Seeing below code, it doesn't seem that we place them in /tmp. If I am correct, then can you update the comment. +static void +subxact_filename(char *path, Oid subid, TransactionId xid) +{ + char tempdirpath[MAXPGPATH]; + + TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID); 11. + * The change is serialied in a simple format, with length (not including + * the length), action code (identifying the message type) and message + * contents (without the subxact TransactionId value). + * .. + */ +static void +stream_write_change(char action, StringInfo s) The part of the comment which says "with length (not including the length) .." is not clear to me. What does "not including the length" mean? 12. + * TODO: Add missing_ok flag to specify in which cases it's OK not to + * find the files, and when it's an error. + */ +static void +stream_cleanup_files(Oid subid, TransactionId xid) I think we can implement this TODO. It is clear when this function is called from apply_handle_stream_commit, the file must exist. We can similarly analyze other callers of this API. 13. +apply_handle_stream_abort(StringInfo s) { .. + /* FIXME optimize the search by bsearch on sorted data */ + for (i = nsubxacts; i > 0; i--) .. I am not sure how important this optimization is, so instead of FIXME, it is better to keep it as a XXX comment. In the future, if we hit any performance issue due to this, we can revisit our decision. [1] - https://www.postgresql.org/message-id/CAA4eK1LH7xzF%2B-qHRv9EDXQTFYjPUYZw5B7FSK9QLEg7F603OQ%40mail.gmail.com -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: