Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers

From Dilip Kumar
Subject Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Date
Msg-id CAFiTN-u1MCP5gj=d=up3sokzm05G1y=7sYXF=hrjufAsS9dn3g@mail.gmail.com
Whole thread Raw
In response to Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
List pgsql-hackers
On Tue, Jun 16, 2020 at 2:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Mon, Jun 15, 2020 at 6:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > I have few more comments on the patch
> > 0013-Change-buffile-interface-required-for-streaming-.patch:
> >
>
> Review comments on 0014-Worker-tempfile-use-the-shared-buffile-infrastru:
> 1.
> The subxact file is only create if there
> + * are any suxact info under this xid.
> + */
> +typedef struct StreamXidHash
>
> Lets slightly reword the part of the comment as "The subxact file is
> created iff there is any suxact info under this xid."

Done

>
> 2.
> @@ -710,6 +740,9 @@ apply_handle_stream_stop(StringInfo s)
>   subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
>   stream_close_file();
>
> + /* Commit the per-stream transaction */
> + CommitTransactionCommand();
>
> Before calling commit, ensure that we are in a valid transaction.  I
> think we can have an Assert for IsTransactionState().

Done

> 3.
> @@ -761,11 +791,13 @@ apply_handle_stream_abort(StringInfo s)
>
>   int64 i;
>   int64 subidx;
> - int fd;
> + BufFile    *fd;
>   bool found = false;
>   char path[MAXPGPATH];
> + StreamXidHash *ent;
>
>   subidx = -1;
> + ensure_transaction();
>   subxact_info_read(MyLogicalRepWorker->subid, xid);
>
> Why to call ensure_transaction here?  Is there any reason that we
> won't have a valid transaction by now?  If not, then its better to
> have an Assert for IsTransactionState().

We are only starting transaction from stream_start to stream_stop,  so
at stream_abort we will not have the transaction.

> 4.
> - if (write(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
> + if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
>   {
> - int save_errno = errno;
> + int save_errno = errno;
>
> - CloseTransientFile(fd);
> + BufFileClose(fd);
>
> On error, won't these files be close automatically?  If so, why at
> this place and before other errors, we need to close this?

Yes, that's correct.  I have fixed those.

> 5.
> if ((len > 0) && ((BufFileRead(fd, subxacts, len)) != len))
> {
> int save_errno = errno;
>
> BufFileClose(fd);
> errno = save_errno;
> ereport(ERROR,
> (errcode_for_file_access(),
> errmsg("could not read file \"%s\": %m",
>
> Can we change the error message to "could not read from streaming
> transactions file .." or something like that and similarly we can
> change the message for failure in reading changes file?

Done


> 6.
> if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
> {
> int save_errno = errno;
>
> BufFileClose(fd);
> errno = save_errno;
> ereport(ERROR,
> (errcode_for_file_access(),
> errmsg("could not write to file \"%s\": %m",
>
> Similar to previous, can we change it to "could not write to streaming
> transactions file

BufFileWrite is not returning failure anymore.

> 7.
> @@ -2855,17 +2844,32 @@ stream_open_file(Oid subid, TransactionId xid,
> bool first_segment)
>   * for writing, in append mode.
>   */
>   if (first_segment)
> - flags = (O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
> - else
> - flags = (O_WRONLY | O_APPEND | PG_BINARY);
> + {
> + /*
> + * Shared fileset handle must be allocated in the persistent context.
> + */
> + SharedFileSet *fileset =
> + MemoryContextAlloc(ApplyContext, sizeof(SharedFileSet));
>
> - stream_fd = OpenTransientFile(path, flags);
> + PrepareTempTablespaces();
> + SharedFileSetInit(fileset, NULL);
>
> Why are we calling PrepareTempTablespaces here? It is already called
> in SharedFileSetInit.

My bad,  First I tired using SharedFileSetInit but later it got
changed for forgot to remove this part.

> 8.
> + /*
> + * Start a transaction on stream start, this transaction will be committed
> + * on the stream stop.  We need the transaction for handling the buffile,
> + * used for serializing the streaming data and subxact info.
> + */
> + ensure_transaction();
>
> I think we need this for PrepareTempTablespaces to set the
> temptablespaces.  Also, isn't it required for a cleanup of buffile
> resources at the transaction end?  Are there any other reasons for it
> as well?  The comment should be a bit more clear for why we need a
> transaction here.

I am not sure that will it make sense to add a comment here that why
buffile and sharedfileset need a transaction?  Do you think that we
should add comment in buffile/shared fileset API that it should be
called under a transaction?

> 9.
> * Open a file for streamed changes from a toplevel transaction identified
>  * by stream_xid (global variable). If it's the first chunk of streamed
>  * changes for this transaction, perform cleanup by removing existing
>  * files after a possible previous crash.
> ..
> stream_open_file(Oid subid, TransactionId xid, bool first_segment)
>
> The above part comment atop stream_open_file needs to be changed after
> new implementation.

Done

> 10.
>  * enabled.  This context is reeset on each stream stop.
> */
> LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
>
> /reeset/reset

Done


> 11.
> stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok)
> {
> ..
> + /* No entry created for this xid so simply return. */
> + if (ent == NULL)
> + return;
> ..
> }
>
> Is there any reason or scenario where this ent can be NULL?  If not,
> it will be better to have an Assert for the same.

Right, it should be an assert, even if all the changes are ignored for
the top transaction, we should have sent the stream_start.

> 12.
> subxact_info_write(Oid subid, TransactionId xid)
> {
> ..
> + /*
> + * If there is no subtransaction then nothing to do,  but if already have
> + * subxact file then delete that.
> + */
> + if (nsubxacts == 0)
>   {
> - ereport(ERROR,
> - (errcode_for_file_access(),
> - errmsg("could not create file \"%s\": %m",
> - path)));
> + if (ent->subxact_fileset)
> + {
> + cleanup_subxact_info();
> + BufFileDeleteShared(ent->subxact_fileset, path);
> + ent->subxact_fileset = NULL;
> ..
> }
>
> Here don't we need to free the subxact_fileset before setting it to NULL?

Yes, done

> 13.
> + /*
> + * Scan complete hash and delete the underlying files for the the xids.
> + * Also delete the memory for the shared file sets.
> + */
>
> /the the/the.  Instead of "delete the memory", it would be better to
> say "release the memory".

Done

>
> 14.
> + /*
> + * We might not have created the suxact fileset if there is no sub
> + * transaction.
> + */
>
> /suxact/subxact
Done




--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: Dilip Kumar
Date:
Subject: Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Next
From: Thomas Munro
Date:
Subject: Testing big endian code with Travis CI's new s390x support