Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Andres Freund
Subject Re: Minimal logical decoding on standbys
Date
Msg-id 20190710031424.dfhzlqj2tpvznibe@alap3.anarazel.de
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  (Amit Khandekar <amitdkhan.pg@gmail.com>)
Responses Re: Minimal logical decoding on standbys  (Robert Haas <robertmhaas@gmail.com>)
Re: Minimal logical decoding on standbys  (Amit Khandekar <amitdkhan.pg@gmail.com>)
List pgsql-hackers
Hi,

Thanks for the new version! Looks like we're making progress towards
something committable here.

I think it'd be good to split the patch into a few pieces. I'd maybe do
that like:
1) WAL format changes (plus required other changes)
2) Recovery conflicts with slots
3) logical decoding on standby
4) tests


> @@ -589,6 +590,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi
>       */
>
>      /* XLOG stuff */
> +    xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
>      xlrec_reuse.node = rel->rd_node;
>      xlrec_reuse.block = blkno;
>      xlrec_reuse.latestRemovedXid = latestRemovedXid;

Hm. I think we otherwise only ever use
RelationIsAccessibleInLogicalDecoding() on tables, not on indexes.  And
while I think this would mostly work for builtin catalog tables, it
won't work for "user catalog tables" as RelationIsUsedAsCatalogTable()
won't perform any useful checks for indexes.

So I think we either need to look up the table, or pass it down.


> diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
> index d768b9b..10b7857 100644
> --- a/src/backend/access/heap/heapam.c
> +++ b/src/backend/access/heap/heapam.c
> @@ -7149,12 +7149,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
>   * see comments for vacuum_log_cleanup_info().
>   */
>  XLogRecPtr
> -log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
> +log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
>  {
>      xl_heap_cleanup_info xlrec;
>      XLogRecPtr    recptr;
>
> -    xlrec.node = rnode;
> +    xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
> +    xlrec.node = rel->rd_node;
>      xlrec.latestRemovedXid = latestRemovedXid;
>
>      XLogBeginInsert();
> @@ -7190,6 +7191,7 @@ log_heap_clean(Relation reln, Buffer buffer,
>      /* Caller should not call me on a non-WAL-logged relation */
>      Assert(RelationNeedsWAL(reln));
>
> +    xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);

It'd probably be a good idea to add a comment to
RelationIsUsedAsCatalogTable() that it better never invoke anything
performing catalog accesses. Otherwise there's quite the danger with
recursion (some operation doing RelationIsAccessibleInLogicalDecoding(),
that then accessing the catalog, which in turn could again need to
perform said operation, loop).



>  /* Entry in pending-list of TIDs we need to revisit */
> @@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
>      OffsetNumber itemnos[MaxIndexTuplesPerPage];
>      spgxlogVacuumRedirect xlrec;
>
> +    xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
>      xlrec.nToPlaceholder = 0;
>      xlrec.newestRedirectXid = InvalidTransactionId;

We should document that it is safe to do catalog acceses here, because
spgist is never used to back catalogs. Otherwise there would be an a
endless recursion danger here.

Did you check how hard it we to just pass down the heap relation?


>  /*
> + * Get the wal_level from the control file.
> + */
> +WalLevel
> +GetActiveWalLevel(void)
> +{
> +    return ControlFile->wal_level;
> +}

What does "Active" mean here? I assume it's supposed to indicate that it
could be different than what's configured in postgresql.conf, for a
replica? If so, that should be mentioned.


> +/*
>   * Initialization of shared memory for XLOG
>   */
>  Size
> @@ -9843,6 +9852,19 @@ xlog_redo(XLogReaderState *record)
>          /* Update our copy of the parameters in pg_control */
>          memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>
> +        /*
> +         * Drop logical slots if we are in hot standby and master does not have
> +         * logical data.

nitpick: s/master/the primary/ (mostly adding the "the", but I
personally also prefer primary over master)

s/logical data/a WAL level sufficient for logical decoding/


> Don't bother to search for the slots if standby is
> +         * running with wal_level lower than logical, because in that case,
> +         * we would have either disallowed creation of logical slots or dropped
> +         * existing ones.

s/Don't bother/No need/
s/slots/potentially conflicting logically slots/

> +        if (InRecovery && InHotStandby &&
> +            xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +            wal_level >= WAL_LEVEL_LOGICAL)
> +            ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId,
> +                gettext_noop("Logical decoding on standby requires wal_level >= logical on master."));



> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
> index 151c3ef..c1bd028 100644
> --- a/src/backend/replication/logical/decode.c
> +++ b/src/backend/replication/logical/decode.c
> @@ -190,11 +190,23 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>               * can restart from there.
>               */
>              break;
> +        case XLOG_PARAMETER_CHANGE:
> +        {
> +            xl_parameter_change *xlrec =
> +                (xl_parameter_change *) XLogRecGetData(buf->record);
> +            /* Cannot proceed if master itself does not have logical data */

This needs an explanation as to how this is reachable...


> +            if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
> +                ereport(ERROR,
> +                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> +                         errmsg("logical decoding on standby requires "
> +                                "wal_level >= logical on master")));
> +            break;

Hm, this strikes me as a not quite good enough error message (same in
other copies of the message). Perhaps something roughly like "could not
continue with logical decoding, the primary's wal level is now too low
(%u)"?


>      if (RecoveryInProgress())
> -        ereport(ERROR,
> -                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> -                 errmsg("logical decoding cannot be used while in recovery")));
> +    {
> +        /*
> +         * This check may have race conditions, but whenever
> +         * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
> +         * verify that there are no existing logical replication slots. And to
> +         * avoid races around creating a new slot,
> +         * CheckLogicalDecodingRequirements() is called once before creating
> +         * the slot, and once when logical decoding is initially starting up.
> +         */
> +        if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL)
> +            ereport(ERROR,
> +                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> +                     errmsg("logical decoding on standby requires "
> +                            "wal_level >= logical on master")));
> +    }
>  }
>
>  /*
> @@ -241,6 +240,8 @@ CreateInitDecodingContext(char *plugin,
>      LogicalDecodingContext *ctx;
>      MemoryContext old_context;
>
> +    CheckLogicalDecodingRequirements();
> +

This should reference the above explanation.




>  /*
> + * Permanently drop a conflicting replication slot. If it's already active by
> + * another backend, send it a recovery conflict signal, and then try again.
> + */
> +static void
> +ReplicationSlotDropConflicting(ReplicationSlot *slot)


> +void
> +ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid,
> +                                        char *conflict_reason)
> +{
> +            /*
> +             * Build the conflict_str which will look like :
> +             * "Slot conflicted with xid horizon which was being increased
> +             * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)."
> +             */
> +            initStringInfo(&conflict_xmins);
> +            if (TransactionIdIsValid(slot_xmin) &&
> +                TransactionIdPrecedesOrEquals(slot_xmin, xid))
> +            {
> +                appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin);
> +            }
> +            if (TransactionIdIsValid(slot_catalog_xmin) &&
> +                TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
> +                appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d",
> +                                 conflict_xmins.len > 0 ? ", " : "",
> +                                 slot_catalog_xmin);
> +
> +            if (conflict_xmins.len > 0)
> +            {
> +                initStringInfo(&conflict_str);
> +                appendStringInfo(&conflict_str, "%s %d (%s).",
> +                                 conflict_sentence, xid, conflict_xmins.data);
> +                found_conflict = true;
> +                conflict_reason = conflict_str.data;
> +            }
> +        }


I think this is going to be a nightmare for translators, no? I'm not
clear as to why any of this is needed?



> +            /* ReplicationSlotDropPtr() would acquire the lock below */
> +            LWLockRelease(ReplicationSlotControlLock);

"would acquire"? I think it *does* acquire, right?



> @@ -2879,6 +2882,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>              case PROCSIG_RECOVERY_CONFLICT_LOCK:
>              case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
>              case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
> +            case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
> +                /*
> +                 * For conflicts that require a logical slot to be dropped, the
> +                 * requirement is for the signal receiver to release the slot,
> +                 * so that it could be dropped by the signal sender. So for
> +                 * normal backends, the transaction should be aborted, just
> +                 * like for other recovery conflicts. But if it's walsender on
> +                 * standby, then it has to be killed so as to release an
> +                 * acquired logical slot.
> +                 */
> +                if (am_cascading_walsender &&
> +                    reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
> +                    MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
> +                {
> +                    RecoveryConflictPending = true;
> +                    QueryCancelPending = true;
> +                    InterruptPending = true;
> +                    break;
> +                }

Huh, I'm not following as to why that's needed for walsenders?


> @@ -1499,6 +1499,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
>                            dbentry->n_conflict_tablespace +
>                            dbentry->n_conflict_lock +
>                            dbentry->n_conflict_snapshot +
> +                          dbentry->n_conflict_logicalslot +
>                            dbentry->n_conflict_bufferpin +
>                            dbentry->n_conflict_startup_deadlock);

I think this probably needs adjustments in a few more places,
e.g. monitoring.sgml...

Thanks!

Andres Freund



pgsql-hackers by date:

Previous
From: Stephen Frost
Date:
Subject: Re: [Proposal] Table-level Transparent Data Encryption (TDE) and KeyManagement Service (KMS)
Next
From: Robert Haas
Date:
Subject: Re: Minimal logical decoding on standbys