Re: pglogical_output - a general purpose logical decoding output plugin - Mailing list pgsql-hackers

From Andres Freund
Subject Re: pglogical_output - a general purpose logical decoding output plugin
Date
Msg-id 20160129101642.vgux2yh3prejunkw@alap3.anarazel.de
Whole thread Raw
In response to pglogical_output - a general purpose logical decoding output plugin  (Craig Ringer <craig@2ndquadrant.com>)
Responses Re: pglogical_output - a general purpose logical decoding output plugin  (Craig Ringer <craig@2ndquadrant.com>)
List pgsql-hackers
Hi,

so, I'm reviewing the output of:
> git diff $(git merge-base upstream/master 2ndq/dev/pglogical-output)..2ndq/dev/pglogical-output
> diff --git a/contrib/Makefile b/contrib/Makefile
> index bd251f6..028fd9a 100644
> --- a/contrib/Makefile
> +++ b/contrib/Makefile
> @@ -35,6 +35,8 @@ SUBDIRS = \
>          pg_stat_statements \
>          pg_trgm        \
>          pgcrypto    \
> +        pglogical_output \
> +        pglogical_output_plhooks \

I'm doubtful we want these plhooks. You aren't allowed to access normal
(non user catalog) tables in output plugins. That seems too much to
expose to plpgsql function imo.

> +++ b/contrib/pglogical_output/README.md

I don't think we've markdown in postgres so far - so let's just keep the
current content and remove the .md :P

> +==== Table metadata header
> +
> +|===
> +|*Message*|*Type/Size*|*Notes*
> +
> +|Message type|signed char|Literal ‘**R**’ (0x52)
> +|flags|uint8| * 0-6: Reserved, client _must_ ERROR if set and not recognised.
> +|relidentifier|uint32|Arbitrary relation id, unique for this upstream. In practice this will probably be the
upstreamtable’s oid, but the downstream can’t assume anything.
 
> +|nspnamelength|uint8|Length of namespace name
> +|nspname|signed char[nspnamelength]|Relation namespace (null terminated)
> +|relnamelength|uint8|Length of relation name
> +|relname|char[relname]|Relation name (null terminated)
> +|attrs block|signed char|Literal: ‘**A**’ (0x41)
> +|natts|uint16|number of attributes
> +|[fields]|[composite]|Sequence of ‘natts’ column metadata blocks, each of which begins with a column delimiter
followedby zero or more column metadata blocks, each with the same column metadata block header.
 

That's a fairly high overhead. Hm.


> +== JSON protocol
> +
> +If `proto_format` is set to `json` then the output plugin will emit JSON
> +instead of the custom binary protocol. JSON support is intended mainly for
> +debugging and diagnostics.
> +

I'm fairly strongly opposed to including two formats in one output
plugin. I think the demand for being able to look into the binary
protocol should instead be satisfied by having a function that "expands"
the binary data returned into something easier to understand.

> + * Copyright (c) 2012-2015, PostgreSQL Global Development Group

2016 ;)


> +            case PARAM_BINARY_BASETYPES_MAJOR_VERSION:
> +                val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
> +                data->client_binary_basetypes_major_version = DatumGetUInt32(val);
> +                break;

Why is the major version tied to basetypes (by name)? Seem more
generally useful.


> +            case PARAM_RELMETA_CACHE_SIZE:
> +                val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_INT32);
> +                data->client_relmeta_cache_size = DatumGetInt32(val);
> +                break;

I'm not convinced this a) should be optional b) should have a size
limit. Please argue for that choice. And how the client should e.g. know
about evictions in that cache.



> --- /dev/null
> +++ b/contrib/pglogical_output/pglogical_config.h
> @@ -0,0 +1,55 @@
> +#ifndef PG_LOGICAL_CONFIG_H
> +#define PG_LOGICAL_CONFIG_H
> +
> +#ifndef PG_VERSION_NUM
> +#error <postgres.h> must be included first
> +#endif

Huh?

> +#include "nodes/pg_list.h"
> +#include "pglogical_output.h"
> +
> +inline static bool
> +server_float4_byval(void)
> +{
> +#ifdef USE_FLOAT4_BYVAL
> +    return true;
> +#else
> +    return false;
> +#endif
> +}
> +
> +inline static bool
> +server_float8_byval(void)
> +{
> +#ifdef USE_FLOAT8_BYVAL
> +    return true;
> +#else
> +    return false;
> +#endif
> +}
> +
> +inline static bool
> +server_integer_datetimes(void)
> +{
> +#ifdef USE_INTEGER_DATETIMES
> +    return true;
> +#else
> +    return false;
> +#endif
> +}
> +
> +inline static bool
> +server_bigendian(void)
> +{
> +#ifdef WORDS_BIGENDIAN
> +    return true;
> +#else
> +    return false;
> +#endif
> +}

Not convinced these should exists, and even moreso exposed in a header.

> +/*
> + * Returns Oid of the hooks function specified in funcname.
> + *
> + * Error is thrown if function doesn't exist or doen't return correct datatype
> + * or is volatile.
> + */
> +static Oid
> +get_hooks_function_oid(List *funcname)
> +{
> +    Oid            funcid;
> +    Oid            funcargtypes[1];
> +
> +    funcargtypes[0] = INTERNALOID;
> +
> +    /* find the the function */
> +    funcid = LookupFuncName(funcname, 1, funcargtypes, false);
> +
> +    /* Validate that the function returns void */
> +    if (get_func_rettype(funcid) != VOIDOID)
> +    {
> +        ereport(ERROR,
> +                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
> +                 errmsg("function %s must return void",
> +                        NameListToString(funcname))));
> +    }

Hm, this seems easy to poke holes into. I mean you later use it like:

> +    if (data->hooks_setup_funcname != NIL)
> +    {
> +        hooks_func = get_hooks_function_oid(data->hooks_setup_funcname);
> +
> +        old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
> +        (void) OidFunctionCall1(hooks_func, PointerGetDatum(&data->hooks));
> +        MemoryContextSwitchTo(old_ctxt);

e.g. you basically assume the function the does something reasonable
with those types. Why don't we instead create a 'plogical_hooks' return
type, and have the function return that?

> +    if (func_volatile(funcid) == PROVOLATILE_VOLATILE)
> +    {
> +        ereport(ERROR,
> +                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
> +                 errmsg("function %s must not be VOLATILE",
> +                        NameListToString(funcname))));
> +    }

Hm, not sure what that's supposed to achieve. You could argue for
requiring the function to be immutable (i.e. not stable or volatile),
but I'm not sure what that'd achieve.

> +        old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
> +        (void) (*data->hooks.startup_hook)(&args);
> +        MemoryContextSwitchTo(old_ctxt);

What is the hooks memory contexts intended to achieve? It's apparently
never reset. Normally output plugin calbacks are called in more
shortlived memory contexts, for good reason, to avoid leaks....


> +bool
> +call_row_filter_hook(PGLogicalOutputData *data, ReorderBufferTXN *txn,
> +        Relation rel, ReorderBufferChange *change)
> +{
> +    struct  PGLogicalRowFilterArgs hook_args;
> +    MemoryContext old_ctxt;
> +    bool ret = true;
> +
> +    if (data->hooks.row_filter_hook != NULL)
> +    {
> +        hook_args.change_type = change->action;
> +        hook_args.private_data = data->hooks.hooks_private_data;
> +        hook_args.changed_rel = rel;
> +        hook_args.change = change;
> +
> +        elog(DEBUG3, "calling pglogical row filter hook");
> +
> +        old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
> +        ret = (*data->hooks.row_filter_hook)(&hook_args);

Why aren't we passing txn to the filter? ISTM it'd be better to
basically reuse/extend the signature by the the original change
callback.


> +/* These must be available to pg_dlsym() */

No the following don't? And they aren't, since they're static functions?
_PG_init and _PG_output_plugin_init need to, but that's it.


> +/*
> + * COMMIT callback
> + */
> +void
> +pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> +                     XLogRecPtr commit_lsn)
> +{

Missing static's?



> +/*
> + * Relation metadata invalidation, for when a relcache invalidation
> + * means that we need to resend table metadata to the client.
> + */
> +static void
> +relmeta_cache_callback(Datum arg, Oid relid)
> + {
> +    /*
> +     * We can be called after decoding session teardown becaues the
> +     * relcache callback isn't cleared. In that case there's no action
> +     * to take.
> +     */
> +    if (RelMetaCache == NULL)
> +        return;
> +
> +    /*
> +     * Nobody keeps pointers to entries in this hash table around so
> +     * it's safe to directly HASH_REMOVE the entries as soon as they are
> +     * invalidated. Finding them and flagging them invalid then removing
> +     * them lazily might save some memory churn for tables that get
> +     * repeatedly invalidated and re-sent, but it dodesn't seem worth
> +     * doing.
> +     *
> +     * Getting invalidations for relations that aren't in the table is
> +     * entirely normal, since there's no way to unregister for an
> +     * invalidation event. So we don't care if it's found or not.
> +     */
> +    (void) hash_search(RelMetaCache, &relid, HASH_REMOVE, NULL);
> + }

So, I don't buy this, like at all. The cache entry is passed to
functions, while we call output functions and such. Which in turn can
cause cache invalidations to be processed.

> +struct PGLRelMetaCacheEntry
> +{
> +    Oid relid;
> +    /* Does the client have this relation cached? */
> +    bool is_cached;
> +    /* Field for API plugin use, must be alloc'd in decoding context */
> +    void *api_private;
> +};

I don't see how api_private can safely be used. At the very least it
needs a lot more documentation about memory lifetime rules and
such. Afaics we'd just forever leak memory atm.

Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Alvaro Herrera
Date:
Subject: Re: Fuzzy substring searching with the pg_trgm extension
Next
From: Tom Lane
Date:
Subject: Re: Template for commit messages