Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Date
Msg-id CAA4eK1K_nv2AvqQz0Fi9tBfKdmj3KnCVWU=R3zwhH2wqU8EVyA@mail.gmail.com
Whole thread Raw
In response to Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
List pgsql-hackers
On Mon, Jun 15, 2020 at 6:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> I have few more comments on the patch
> 0013-Change-buffile-interface-required-for-streaming-.patch:
>

Review comments on 0014-Worker-tempfile-use-the-shared-buffile-infrastru:
1.
The subxact file is only create if there
+ * are any suxact info under this xid.
+ */
+typedef struct StreamXidHash

Lets slightly reword the part of the comment as "The subxact file is
created iff there is any suxact info under this xid."

2.
@@ -710,6 +740,9 @@ apply_handle_stream_stop(StringInfo s)
  subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
  stream_close_file();

+ /* Commit the per-stream transaction */
+ CommitTransactionCommand();

Before calling commit, ensure that we are in a valid transaction.  I
think we can have an Assert for IsTransactionState().

3.
@@ -761,11 +791,13 @@ apply_handle_stream_abort(StringInfo s)

  int64 i;
  int64 subidx;
- int fd;
+ BufFile    *fd;
  bool found = false;
  char path[MAXPGPATH];
+ StreamXidHash *ent;

  subidx = -1;
+ ensure_transaction();
  subxact_info_read(MyLogicalRepWorker->subid, xid);

Why to call ensure_transaction here?  Is there any reason that we
won't have a valid transaction by now?  If not, then its better to
have an Assert for IsTransactionState().

4.
- if (write(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
+ if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
  {
- int save_errno = errno;
+ int save_errno = errno;

- CloseTransientFile(fd);
+ BufFileClose(fd);

On error, won't these files be close automatically?  If so, why at
this place and before other errors, we need to close this?

5.
if ((len > 0) && ((BufFileRead(fd, subxacts, len)) != len))
{
int save_errno = errno;

BufFileClose(fd);
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m",

Can we change the error message to "could not read from streaming
transactions file .." or something like that and similarly we can
change the message for failure in reading changes file?

6.
if (BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
{
int save_errno = errno;

BufFileClose(fd);
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m",

Similar to previous, can we change it to "could not write to streaming
transactions file

7.
@@ -2855,17 +2844,32 @@ stream_open_file(Oid subid, TransactionId xid,
bool first_segment)
  * for writing, in append mode.
  */
  if (first_segment)
- flags = (O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
- else
- flags = (O_WRONLY | O_APPEND | PG_BINARY);
+ {
+ /*
+ * Shared fileset handle must be allocated in the persistent context.
+ */
+ SharedFileSet *fileset =
+ MemoryContextAlloc(ApplyContext, sizeof(SharedFileSet));

- stream_fd = OpenTransientFile(path, flags);
+ PrepareTempTablespaces();
+ SharedFileSetInit(fileset, NULL);

Why are we calling PrepareTempTablespaces here? It is already called
in SharedFileSetInit.

8.
+ /*
+ * Start a transaction on stream start, this transaction will be committed
+ * on the stream stop.  We need the transaction for handling the buffile,
+ * used for serializing the streaming data and subxact info.
+ */
+ ensure_transaction();

I think we need this for PrepareTempTablespaces to set the
temptablespaces.  Also, isn't it required for a cleanup of buffile
resources at the transaction end?  Are there any other reasons for it
as well?  The comment should be a bit more clear for why we need a
transaction here.

9.
* Open a file for streamed changes from a toplevel transaction identified
 * by stream_xid (global variable). If it's the first chunk of streamed
 * changes for this transaction, perform cleanup by removing existing
 * files after a possible previous crash.
..
stream_open_file(Oid subid, TransactionId xid, bool first_segment)

The above part comment atop stream_open_file needs to be changed after
new implementation.

10.
 * enabled.  This context is reeset on each stream stop.
*/
LogicalStreamingContext = AllocSetContextCreate(ApplyContext,

/reeset/reset

11.
stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok)
{
..
+ /* No entry created for this xid so simply return. */
+ if (ent == NULL)
+ return;
..
}

Is there any reason or scenario where this ent can be NULL?  If not,
it will be better to have an Assert for the same.

12.
subxact_info_write(Oid subid, TransactionId xid)
{
..
+ /*
+ * If there is no subtransaction then nothing to do,  but if already have
+ * subxact file then delete that.
+ */
+ if (nsubxacts == 0)
  {
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create file \"%s\": %m",
- path)));
+ if (ent->subxact_fileset)
+ {
+ cleanup_subxact_info();
+ BufFileDeleteShared(ent->subxact_fileset, path);
+ ent->subxact_fileset = NULL;
..
}

Here don't we need to free the subxact_fileset before setting it to NULL?

13.
+ /*
+ * Scan complete hash and delete the underlying files for the the xids.
+ * Also delete the memory for the shared file sets.
+ */

/the the/the.  Instead of "delete the memory", it would be better to
say "release the memory".

14.
+ /*
+ * We might not have created the suxact fileset if there is no sub
+ * transaction.
+ */

/suxact/subxact

-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: factorial of negative numbers
Next
From: Juan José Santamaría Flecha
Date:
Subject: Re: factorial of negative numbers