Re: Copy function for logical replication slots - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Copy function for logical replication slots
Date
Msg-id CAD21AoC4eoRadiEqhTnTDcMBhYo2_Ne9gQPRe3emNn5x9bO4QQ@mail.gmail.com
Whole thread Raw
In response to Re: Copy function for logical replication slots  (Andres Freund <andres@anarazel.de>)
Responses Re: Copy function for logical replication slots
List pgsql-hackers
On Sat, Feb 16, 2019 at 12:34 PM Andres Freund <andres@anarazel.de> wrote:
>
> Hi,
>

Thank you for your comment.

> On 2019-01-15 10:56:04 +0900, Masahiko Sawada wrote:
>
> > +         <primary>pg_copy_physical_replication_slot</primary>
> > +        </indexterm>
> > +        <literal><function>pg_copy_physical_replication_slot(<parameter>src_slot_name</parameter>
<type>name</type>,<parameter>dst_slot_name</parameter> <optional>, <parameter>temporary</parameter>
<type>bool</type></optional>)</function></literal>
> > +       </entry>
> > +       <entry>
> > +        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
> > +       </entry>
> > +       <entry>
> > +        Copies an existing physical replication slot name <parameter>src_slot_name</parameter>
> > +        to a physical replication slot named <parameter>dst_slot_name</parameter>.
> > +        The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
> > +        source slot.
> > +        <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
> > +        is omitted, the same value as the source slot is used.
> > +       </entry>
> > +      </row>
> > +
> > +      <row>
> > +       <entry>
> > +        <indexterm>
> > +         <primary>pg_copy_logical_replication_slot</primary>
> > +        </indexterm>
> > +        <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter>
<type>name</type>,<parameter>dst_slot_name</parameter> <optional>, <parameter>plugin</parameter> <type>name</type>
<optional>,<parameter>temporary</parameter> <type>boolean</type></optional></optional>)</function></literal> 
> > +       </entry>
> > +       <entry>
> > +        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
> > +       </entry>
> > +       <entry>
> > +        Copies an existing logical replication slot name <parameter>src_slot_name</parameter>
> > +        to a logical replication slot named <parameter>dst_slot_name</parameter>
> > +        while changing the output plugin and persistence. The copied logical slot starts
> > +        from the same <acronym>LSN</acronym> as the source logical slot. Both <parameter>plugin</parameter> and
> > +        <parameter>temporary</parameter> are optional. If <parameter>plugin</parameter>
> > +        or <parameter>temporary</parameter> are omitted, the same values as
> > +        the source logical slot are used.
> > +       </entry>
> > +      </row>
>
> Would it make sense to move the differing options to the end of the
> argument list? Right now we have a few common params, then a different
> one, and then another common one?

Agreed, will fix.

>
>
> > @@ -271,7 +272,7 @@ CreateInitDecodingContext(char *plugin,
> >       StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
> >       SpinLockRelease(&slot->mutex);
> >
> > -     ReplicationSlotReserveWal();
> > +     ReplicationSlotReserveWal(restart_lsn);
>
> Why do we even need to call this? It ought to be guaranteed that there's
> sufficient WAL, right?  And somehow it seems harder to understand to me
> that the reserve routine gets an LSN.

That's right in copy function cases. I'll change it so that
CreateInitDecodingContext() sets the start lsn without WAL reservation
routine if the passed-in restart_lsn is a valid value. So the caller
must guarantee that the lsn is available.

>
>
> >  /*
> >   * Reserve WAL for the currently active slot.
> >   *
> > - * Compute and set restart_lsn in a manner that's appropriate for the type of
> > - * the slot and concurrency safe.
> > + * If an lsn to reserve is not requested, compute and set restart_lsn
> > + * in a manner that's appropriate for the type of the slot and concurrency safe.
> > + * If the reseved WAL is requested, set restart_lsn and check if the corresponding
> > + * wal segment is available.
> >   */
> >  void
> > -ReplicationSlotReserveWal(void)
> > +ReplicationSlotReserveWal(XLogRecPtr requested_lsn)
> >  {
> >       ReplicationSlot *slot = MyReplicationSlot;
> >
> > @@ -1005,47 +1007,57 @@ ReplicationSlotReserveWal(void)
> >        * The replication slot mechanism is used to prevent removal of required
> >        * WAL. As there is no interlock between this routine and checkpoints, WAL
> >        * segments could concurrently be removed when a now stale return value of
> > -      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
> > -      * this happens we'll just retry.
> > +      * ReplicationSlotsComputeRequiredLSN() is used. If the lsn to reserve is
> > +      * not requested, in the unlikely case that this happens we'll just retry.
> >        */
> >       while (true)
> >       {
> >               XLogSegNo       segno;
> >               XLogRecPtr      restart_lsn;
> >
> > -             /*
> > -              * For logical slots log a standby snapshot and start logical decoding
> > -              * at exactly that position. That allows the slot to start up more
> > -              * quickly.
> > -              *
> > -              * That's not needed (or indeed helpful) for physical slots as they'll
> > -              * start replay at the last logged checkpoint anyway. Instead return
> > -              * the location of the last redo LSN. While that slightly increases
> > -              * the chance that we have to retry, it's where a base backup has to
> > -              * start replay at.
> > -              */
> > -             if (!RecoveryInProgress() && SlotIsLogical(slot))
> > +             if (!XLogRecPtrIsInvalid(requested_lsn))
> >               {
> > -                     XLogRecPtr      flushptr;
> > -
> > -                     /* start at current insert position */
> > -                     restart_lsn = GetXLogInsertRecPtr();
> > +                     /* Set the requested lsn */
> >                       SpinLockAcquire(&slot->mutex);
> > -                     slot->data.restart_lsn = restart_lsn;
> > +                     slot->data.restart_lsn = requested_lsn;
> >                       SpinLockRelease(&slot->mutex);
> > -
> > -                     /* make sure we have enough information to start */
> > -                     flushptr = LogStandbySnapshot();
> > -
> > -                     /* and make sure it's fsynced to disk */
> > -                     XLogFlush(flushptr);
> >               }
> >               else
> >               {
> > -                     restart_lsn = GetRedoRecPtr();
> > -                     SpinLockAcquire(&slot->mutex);
> > -                     slot->data.restart_lsn = restart_lsn;
> > -                     SpinLockRelease(&slot->mutex);
> > +                     /*
> > +                      * For logical slots log a standby snapshot and start logical decoding
> > +                      * at exactly that position. That allows the slot to start up more
> > +                      * quickly.
> > +                      *
> > +                      * That's not needed (or indeed helpful) for physical slots as they'll
> > +                      * start replay at the last logged checkpoint anyway. Instead return
> > +                      * the location of the last redo LSN. While that slightly increases
> > +                      * the chance that we have to retry, it's where a base backup has to
> > +                      * start replay at.
> > +                      */
> > +                     if (!RecoveryInProgress() && SlotIsLogical(slot))
> > +                     {
> > +                             XLogRecPtr      flushptr;
> > +
> > +                             /* start at current insert position */
> > +                             restart_lsn = GetXLogInsertRecPtr();
> > +                             SpinLockAcquire(&slot->mutex);
> > +                             slot->data.restart_lsn = restart_lsn;
> > +                             SpinLockRelease(&slot->mutex);
> > +
> > +                             /* make sure we have enough information to start */
> > +                             flushptr = LogStandbySnapshot();
> > +
> > +                             /* and make sure it's fsynced to disk */
> > +                             XLogFlush(flushptr);
> > +                     }
> > +                     else
> > +                     {
> > +                             restart_lsn = GetRedoRecPtr();
> > +                             SpinLockAcquire(&slot->mutex);
> > +                             slot->data.restart_lsn = restart_lsn;
> > +                             SpinLockRelease(&slot->mutex);
> > +                     }
> >               }
> >
> >               /* prevent WAL removal as fast as possible */
> > @@ -1061,6 +1073,21 @@ ReplicationSlotReserveWal(void)
> >               XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
> >               if (XLogGetLastRemovedSegno() < segno)
> >                       break;
>
> This seems like it's harder to understand than before. The loop (and
> most of the rest of the function) doesn't make sense for the copy case,
> so I think it'd be better to just move this into a separate function
> that just verifies that all the WAL is there.

Agreed, will fix.

>
>
> > +             /*
> > +              * The caller has requested a specific wal which we failed to reserve.
> > +              * We can't retry here as the requested wal is no longer available.
> > +              */
> > +             if (!XLogRecPtrIsInvalid(requested_lsn))
> > +             {
> > +                     char filename[MAXFNAMELEN];
> > +
> > +                     XLogFileName(filename, ThisTimeLineID, segno, wal_segment_size);
> > +                     ereport(ERROR,
> > +                                     (errcode(ERRCODE_UNDEFINED_FILE),
> > +                                      errmsg("requested WAL segment %s has already been removed",
> > +                                                     filename)));
> > +             }
> >       }
> >  }
>
> This ought to be unreachable, right?

Right.

>
>
>
>
> > +/*
> > + * Copy physical replication slot (3 arguments)
> > + *
> > + * note: this wrapper is necessary to pass the sanity check in opr_sanity,
> > + * which checks that all built-in functions that share the implementing C
> > + * function take the same number of arguments
> > + */
> > +Datum
> > +pg_copy_physical_replication_slot_no_temp(PG_FUNCTION_ARGS)
> > +{
> > +     return pg_copy_physical_replication_slot(fcinfo);
> > +}
>
> You could avoid this by just defining the wrapper on the SQL level, but
> I'm ok with this.
>
>
> > +     /*
> > +      * To prevent the restart_lsn WAL of the source slot from removal
> > +      * during copying a new slot, we copy it while holding the source slot.
> > +      * Since we are not allowed to create a new one while holding another
> > +      * one, we temporarily save the acquired slot and restore it after
> > +      * creation. Set callback function to ensure we release replication
> > +      * slots if fail below.
> > +      */
> >       if (immediately_reserve)
> > +             saveslot = MyReplicationSlot;
> > +     else
> > +             ReplicationSlotRelease();
>
> Yikes, this is mightily ugly.
>
> Stupid question, but couldn't we optimize this to something like:
>
>     /*
>      * First copy current data of the slot. Then install those in the
>      * new slot. The src slot could have progressed while installing,
>      * but the installed values prevent global horizons from progressing
>      * further. Therefore a second copy is sufficiently up2date.
>      */
>     SpinLockAcquire(&src->mutex);
>     copy_lsn = src->data.restart_lsn;
>     copy_xid = ...;
>     SpinLockRelease(&src->mutex);
>
>     /* install copied values */
>
>
>     SpinLockAcquire(&src->mutex);
>     /* copy data of slot again */
>     SpinLockRelease(&src->mutex);
>
>     /* install again */
>
> ?

With this optimization since we don't need to acquire the source slot
we can copy even from a slot that has already been acquired by
someone, which is great. However is it possible that once released the
first spinlock of the source slot it could be dropped and the global
horizons can progress before installing the copied values?

Regards,

--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


pgsql-hackers by date:

Previous
From: Etsuro Fujita
Date:
Subject: Re: Problems with plan estimates in postgres_fdw
Next
From: Michael Meskes
Date:
Subject: Re: [Bug Fix] ECPG: could not use some CREATE TABLE AS syntax