BUG #14972: row duplicate on first SELECT from CTE (by JOIN/FORUPDATE) from which UPDATE performed recently - Mailing list pgsql-bugs

From dsuchka@gmail.com
Subject BUG #14972: row duplicate on first SELECT from CTE (by JOIN/FORUPDATE) from which UPDATE performed recently
Date
Msg-id 20171213191216.20144.83388@wrigleys.postgresql.org
Whole thread Raw
Responses Re: BUG #14972: row duplicate on first SELECT from CTE (by JOIN/FORUPDATE) from which UPDATE performed recently  ("David G. Johnston" <david.g.johnston@gmail.com>)
List pgsql-bugs
The following bug has been logged on the website:

Bug reference:      14972
Logged by:          Evgeniy Kozlov
Email address:      dsuchka@gmail.com
PostgreSQL version: 9.5.5
Operating system:   gentoo, debian
Description:

Since ON CONFLICT does not work with partitions, I have designed an
aggregation appender by hand using UPDATE (for existed rows) + INSERT (for
new ones). Unexpectedly I got a strange result as a count of updated (really
joined) rows running that function cuncurrently on 9.5.5 and 9.5.7 (9.5.2
works correctly). 
The got value exceeds the expected result by 1.

This happens only if the CTE with SELECT/JOIN performed with FOR UPDATE
option or if actually no UPDATE performed.

There is the result of cuncurrently invocation (first 2 rows are the select
results from the same CTE, and they differ):

psql:/tmp/go.sql:276: NOTICE:  
 *** joined data (first time):
["(1011,0,1,1)", "(1011,0,1,1)", "(1012,0,2,2)", "(1013,1,0,3)",
"(1014,1,1,4)", "(1015,1,2,5)", "(1016,2,0,6)", "(1017,2,1,7)",
"(1018,2,2,8)", "(1019,3,0,9)", "(1020,3,1,10)"]
psql:/tmp/go.sql:276: NOTICE:  
 *** joined data (next time):
["(1011,0,1,1)", "(1012,0,2,2)", "(1013,1,0,3)", "(1014,1,1,4)",
"(1015,1,2,5)", "(1016,2,0,6)", "(1017,2,1,7)", "(1018,2,2,8)",
"(1019,3,0,9)", "(1020,3,1,10)"]
psql:/tmp/go.sql:276: NOTICE:  
 *** input data:
["(0,1,1)", "(0,2,2)", "(1,0,3)", "(1,1,4)", "(1,2,5)", "(2,0,6)",
"(2,1,7)", "(2,2,8)", "(3,0,9)", "(3,1,10)"]
psql:/tmp/go.sql:276: NOTICE:  
 *** overall agg data:
["(1011,0,1,2981)", "(1012,0,2,5962)", "(1013,1,0,8943)",
"(1014,1,1,11924)", "(1015,1,2,14905)", "(1016,2,0,17886)",
"(1017,2,1,20867)", "(1018,2,2,23848)", "(1019,3,0,26829)",
"(1020,3,1,29810)"]

file /tmp/go.sql:
SELECT test_agg.append_agg(to_jsonb(ARRAY(
    SELECT jsonb_build_object('a', x.i / 3, 'b', x.i % 3, 'x', x.i)
        FROM generate_series(1, 10) AS x(i)
    )));
... repeat 100+ times
\i /tmp/go.sql -- yeap, run it recursively w/o tail recursion (growth of
opened fd)
(end of go.sql)

Code (there is the test simplified version):
CREATE SCHEMA IF NOT EXISTS test_agg;

CREATE TABLE test_agg.some_agg (
    id          bigserial PRIMARY KEY,
    key_a       integer NOT NULL,
    key_b       integer NOT NULL,
    value       integer NOT NULL,

    UNIQUE (key_a, key_b)
);

-- Request: [ {"a": <int4>, "b": <int4>, "x": <int4>}, ... ]
CREATE OR REPLACE FUNCTION test_agg.append_agg(request jsonb)
    RETURNS jsonb
    LANGUAGE plpgsql
    AS $$
DECLARE
    input_count         integer;
    updated_count       integer := 0;
    inserted_count      integer;
    joined_count        integer;
    joined_count_data   jsonb;
    joined_data         text;
    input_data          text;
    agg_data            text;
BEGIN
    -- convert the input data (json -> table)
    CREATE TEMP TABLE _tt_input AS
        SELECT  (i->>'a')::integer AS a,
                (i->>'b')::integer AS b,
                (i->>'x')::integer AS x
            FROM jsonb_array_elements(request) AS i
    ;
    ALTER TABLE _tt_input ADD PRIMARY KEY (a, b);
    SELECT count(*) INTO input_count FROM _tt_input;

    <<append_loop>>
    LOOP
        -- update existed agg rows
        WITH
            joined AS (
                SELECT agg.id, src.*
                    FROM test_agg.some_agg AS agg
                    JOIN _tt_input AS src
                        ON (agg.key_a = src.a) AND (agg.key_b = src.b)
--/*
                    ORDER BY agg.key_a, agg.key_b
                    FOR UPDATE
--*/
            ),
--/*
            updated AS (
                UPDATE test_agg.some_agg AS agg
                    SET value = agg.value + src.x
                    FROM joined AS src
                    WHERE (agg.key_a = src.a) AND (agg.key_b = src.b)
            ),
--*/
            cleaned AS (
                DELETE FROM _tt_input
                    WHERE (a, b) IN (SELECT x.a, x.b FROM joined AS x)
            )

            SELECT
                    (SELECT jsonb_build_object('count', foo.c, 'data',
foo.d)
                        FROM (SELECT count(*), jsonb_agg((x.*)::text) FROM
joined AS x) AS foo(c, d)),
                    (SELECT jsonb_agg(row(x.*)::text) FROM joined AS
x)::text,
                    (SELECT jsonb_agg(row(x.*)::text) FROM _tt_input AS
x)::text,
                    (SELECT jsonb_agg(row(x.*)::text) FROM test_agg.some_agg
AS x)::text
                INTO
                    joined_count_data,
                    joined_data,
                    input_data,
                    agg_data
        ; -- end WITH
        joined_count := (joined_count_data->>'count')::integer;
        updated_count := updated_count + joined_count;

        IF (joined_count > input_count) THEN
            RAISE NOTICE E'\n *** joined data (first time):\n%',
joined_count_data->>'data';
            RAISE NOTICE E'\n *** joined data (next time):\n%',
joined_data;
            RAISE NOTICE E'\n *** input data:\n%', input_data;
            RAISE NOTICE E'\n *** overall agg data:\n%', agg_data;
            SELECT pg_sleep(10);
        END IF;

        -- try to insert new ones
        BEGIN
            INSERT INTO test_agg.some_agg(key_a, key_b, value)
                SELECT a, b, x FROM _tt_input;
            GET DIAGNOSTICS inserted_count := ROW_COUNT;
            EXIT append_loop;
        EXCEPTION
            WHEN unique_violation THEN
                NULL;
        END;
    END LOOP append_loop;

    DROP TABLE _tt_input;

    RETURN jsonb_build_object('i', inserted_count, 'u', updated_count);
END
$$;


pgsql-bugs by date:

Previous
From: Tom Lane
Date:
Subject: Re: Build error on Windows 10 of version 9.5.10 using Visual Studio 2015
Next
From: "David G. Johnston"
Date:
Subject: Re: BUG #14972: row duplicate on first SELECT from CTE (by JOIN/FORUPDATE) from which UPDATE performed recently