Thread: BUG #18897: Logical replication conflict after using pg_createsubscriber under heavy load

The following bug has been logged on the website:

Bug reference:      18897
Logged by:          Zane Duffield
Email address:      duffieldzane@gmail.com
PostgreSQL version: 17.4
Operating system:   RHEL9 Linux
Description:

Hi all,

I'm in the process of converting our databases from pglogical logical
replication to the native logical replication implementation on PostgreSQL
17. One of the bugs we encountered and had to work around with pglogical was
the plugin dropping records while converting to a streaming replica to
logical via pglogical_create_subscriber (reported
https://github.com/2ndQuadrant/pglogical/issues/349). I was trying to
confirm that the native logical replication implementation did not have this
problem, and I've found that it might have a different problem.

In rare cases, I'm finding that the logical replica can start with the
logical decoder stuck on a conflicting primary key even though the replica
received no connections, and only decoded changes from the one primary.

I should say that I've been operating under the assumption that
pg_createsubscriber is designed for use on a replica for a *live* primary
database, if this isn't correct then someone please let me know.

I have a script that I've been using to reproduce the issue (pasted at end
of email, because this bug reporting page doesn't seem to support
attachments). It basically performs a loop that sets up a primary and a
physical replica, generates some load, converts the replica to logical,
waits, and makes sure the row counts are the same.
A few variables at the top of the script may need adjusting, depending on
the system's PostgreSQL installation and the desired locations.
The environment variables NUM_THREADS and INSERT_WIDTH can be used to
control the characteristics of the load generation for the primary.

The script isn't terribly good at reproducing the issue; it takes anywhere
from minutes to hours in my testing.
The 2 machines I've reproduced the issue on are both running PostgreSQL 17.4
on RHEL9 in a VM, one VM was moderately powerful (12 cores, 256GB RAM), and
the other was quite weak (2 cores, 2GB RAM).
I found that many combinations of NUM_THREADS and INSERT_WIDTH were able to
reproduce the issue, but NUM_THREADS=40 and INSERT_WIDTH=1000 is a good
starting point for a powerful machine.

Thanks,
Zane

SCRIPT
```
#!/bin/bash

set -ue
set -o pipefail

cd "${BASH_SOURCE[0]%/*}" || exit

POSTGRES_VERSION=17

# Adjust these for the system under test
PATH="/usr/pgsql-$POSTGRES_VERSION/bin:$PATH"
PRIMARY_DIR="/var/lib/pgsql/$POSTGRES_VERSION/primary"
REPLICA_DIR="/var/lib/pgsql/$POSTGRES_VERSION/replica"

PRIMARY_PORT=5340
REPLICA_PORT=5341

DBNAME="test_db"
TABLE="test_table"
TRIGGER_FILE="./stop_inserting"

CYAN="\e[1;36m"
RESET="\e[0m"

TEST_NUM=0
reset_databases() (
    test_start_time=$(date +"%Y-%m-%d_%H-%M-%S")

    if pg_ctl status -D "$REPLICA_DIR" > /dev/null 2>&1 && ! pg_ctl stop -m
immediate -D "$REPLICA_DIR"; then
        echo "ERROR: Could not stop replica."
        return 1
    fi

    if ! rm -rf "$REPLICA_DIR"; then
        echo "ERROR: Could not delete replica directory \"$REPLICA_DIR\"."
        return 1
    fi

    if pg_ctl status -D "$PRIMARY_DIR" > /dev/null 2>&1 && ! pg_ctl stop -m
immediate -D "$PRIMARY_DIR"; then
        echo "ERROR: Could not stop primary."
        return 1
    fi

    if ! rm -rf "$PRIMARY_DIR"; then
        echo "ERROR: Could not delete primary directory \"$PRIMARY_DIR\"."
        return 1
    fi

    if ! initdb -D "$PRIMARY_DIR" --locale=C; then
        echo "ERROR: Could not create primary."
        return 1
    fi

    cat >> "$PRIMARY_DIR"/postgresql.auto.conf << EOF
port = $PRIMARY_PORT
wal_level = logical
max_worker_processes = 10
max_replication_slots = 10
max_wal_senders = 10
max_connections = 1000
log_directory = '$PWD/log'
log_filename = 'postgresql-$test_start_time-test-$TEST_NUM.log'
log_checkpoints = on
log_connections = on
log_disconnections = on
log_replication_commands = on
log_duration = off
log_min_duration_statement = 0
log_statement = all
log_replication_commands = on
log_line_prefix = '%m [%p] port=$PRIMARY_PORT %q%u@%d/%a '
EOF

    cat >> "$PRIMARY_DIR"/pg_hba.conf << EOF
host   replication      postgres     127.0.0.1/32            trust
host       test_db      postgres     127.0.0.1/32            trust
host       test_db      postgres     127.0.0.1/32            trust
EOF

    if ! pg_ctl start -D "$PRIMARY_DIR"; then
        echo "ERROR: Could not start primary."
        return 1
    fi

    if ! pg_basebackup -h localhost -U postgres -p "$PRIMARY_PORT" -D
"$REPLICA_DIR" -P -Xs -R; then
        echo "ERROR: Could not create replica."
        return 1
    fi

    cat >> "$REPLICA_DIR"/postgresql.auto.conf << EOF
port = $REPLICA_PORT
archive_mode = off
hot_standby = on
log_directory = '$PWD/log'
log_filename = 'postgresql-$test_start_time-test-$TEST_NUM.log'
log_checkpoints = on
log_connections = on
log_disconnections = on
log_replication_commands = on
log_duration = off
log_min_duration_statement = 0
log_statement = all
log_replication_commands = on
log_line_prefix = '%m [%p] port=$REPLICA_PORT %q%u@%d/%a '
EOF

    if ! pg_ctl start -D "$REPLICA_DIR"; then
        echo "ERROR: Could not start replica."
        return 1
    fi
)

create_test_database() (
    psql -p "$PRIMARY_PORT" -c "CREATE DATABASE $DBNAME"
    psql -p "$PRIMARY_PORT" -d "$DBNAME" -c "CREATE TABLE $TABLE (f1 int
primary key, insert_time timestamp without time zone)"
)

# Adjust these variables to tweak the load characteristics
NUM_THREADS=${NUM_THREADS:-20}
INSERT_WIDTH=${INSERT_WIDTH:-100}

add_records_to_test_table() (
    id=$1
    start_time=$(date +%s)
    start=$((id * INSERT_WIDTH)) end=0

    while true; do
        end=$((start + INSERT_WIDTH - 1))
        echo "INSERT INTO $TABLE SELECT val, CURRENT_TIMESTAMP FROM
generate_series($start, $end) S(val);" |
            psql -p "$PRIMARY_PORT" -d "$DBNAME" > /dev/null
        start=$((start + NUM_THREADS * INSERT_WIDTH))

        if [[ -f "$TRIGGER_FILE" ]] || (( $(date "+%s") - start_time > 15
)); then
            break
        fi
    done

    return 0
)

INSERT_PIDS=()
start_insert_threads() {
    echo "*** STARTING $NUM_THREADS LOOPS INSERTING $INSERT_WIDTH RECORDS
PER ITERATION ..."
    INSERT_PIDS=()
    for id in $(seq 0 $((NUM_THREADS - 1))); do
        add_records_to_test_table "$id" &
        INSERT_PIDS+=($!)
    done
}

create_subscriber() (
    echo "*** Stopping replica, then running pg_createsubscriber..."

    pg_ctl stop -m fast -D "$REPLICA_DIR"

    pg_createsubscriber -D "$REPLICA_DIR" \
        --database="$DBNAME" \
        --subscription="sub" \
        --publication="pub" \
        --publisher-server="host=localhost port=$PRIMARY_PORT
dbname=$DBNAME"

    pg_ctl start -D "$REPLICA_DIR"

    echo "*** Successfully started logical replica on port $REPLICA_PORT."
)

run_test() (
    create_test_database
    rm -f "$TRIGGER_FILE"
    start_insert_threads
    sleep 2
    create_subscriber
    sleep 0.1

    touch "$TRIGGER_FILE"
    errors=0
    for pid in "${INSERT_PIDS[@]}"; do
        if ! wait "$pid"; then
            echo "ERROR: insert thread with PID $pid failed"
            errors=1
        fi
    done
    (( errors )) && return 1

    echo "*** ALL INSERT LOOPS FINISHED"

    last_src_count=0
    last_dest_count=0

    # wait until the counts stop increasing
    while true; do
        src_count=$(psql -qt -p "$PRIMARY_PORT" -d "$DBNAME" -c "SELECT
COUNT(f1) FROM $TABLE")
        dest_count=$(psql -qt -p "$REPLICA_PORT" -d "$DBNAME" -c "SELECT
COUNT(f1) FROM $TABLE")

        if [[ $dest_count -ne $last_dest_count ]] || [[ $src_count -ne
$last_src_count ]]; then
            last_dest_count=$dest_count
            last_src_count=$src_count
            sleep 0.5s
            continue;
        fi

        echo "SOURCE COUNT = $src_count"
        echo "DEST COUNT   = $dest_count"

        if (( src_count != dest_count )); then
            echo "ERROR: record count mismatch"
            return 1
        fi

        break
    done

    echo -e "*** TEST PASSED\n"
)

for TEST_NUM in {1..10000}; do
    echo -e "${CYAN}*** STARTING TEST #$TEST_NUM${RESET}"
    reset_databases && run_test
done
```


The script may have been broken by formatting in the email. I've attached it to this reply.

From: PG Bug reporting form <noreply@postgresql.org>
Sent: Thursday, April 17, 2025 9:14:19 AM
To: pgsql-bugs@lists.postgresql.org <pgsql-bugs@lists.postgresql.org>
Cc: duffieldzane@gmail.com <duffieldzane@gmail.com>
Subject: BUG #18897: Logical replication conflict after using pg_createsubscriber under heavy load
 
The following bug has been logged on the website:

Bug reference:      18897
Logged by:          Zane Duffield
Email address:      duffieldzane@gmail.com
PostgreSQL version: 17.4
Operating system:   RHEL9 Linux
Description:       

Hi all,

I'm in the process of converting our databases from pglogical logical
replication to the native logical replication implementation on PostgreSQL
17. One of the bugs we encountered and had to work around with pglogical was
the plugin dropping records while converting to a streaming replica to
logical via pglogical_create_subscriber (reported
https://github.com/2ndQuadrant/pglogical/issues/349). I was trying to
confirm that the native logical replication implementation did not have this
problem, and I've found that it might have a different problem.

In rare cases, I'm finding that the logical replica can start with the
logical decoder stuck on a conflicting primary key even though the replica
received no connections, and only decoded changes from the one primary.

I should say that I've been operating under the assumption that
pg_createsubscriber is designed for use on a replica for a *live* primary
database, if this isn't correct then someone please let me know.

I have a script that I've been using to reproduce the issue (pasted at end
of email, because this bug reporting page doesn't seem to support
attachments). It basically performs a loop that sets up a primary and a
physical replica, generates some load, converts the replica to logical,
waits, and makes sure the row counts are the same.
A few variables at the top of the script may need adjusting, depending on
the system's PostgreSQL installation and the desired locations.
The environment variables NUM_THREADS and INSERT_WIDTH can be used to
control the characteristics of the load generation for the primary.

The script isn't terribly good at reproducing the issue; it takes anywhere
from minutes to hours in my testing.
The 2 machines I've reproduced the issue on are both running PostgreSQL 17.4
on RHEL9 in a VM, one VM was moderately powerful (12 cores, 256GB RAM), and
the other was quite weak (2 cores, 2GB RAM).
I found that many combinations of NUM_THREADS and INSERT_WIDTH were able to
reproduce the issue, but NUM_THREADS=40 and INSERT_WIDTH=1000 is a good
starting point for a powerful machine.

Thanks,
Zane

SCRIPT
```
#!/bin/bash

set -ue
set -o pipefail

cd "${BASH_SOURCE[0]%/*}" || exit

POSTGRES_VERSION=17

# Adjust these for the system under test
PATH="/usr/pgsql-$POSTGRES_VERSION/bin:$PATH"
PRIMARY_DIR="/var/lib/pgsql/$POSTGRES_VERSION/primary"
REPLICA_DIR="/var/lib/pgsql/$POSTGRES_VERSION/replica"

PRIMARY_PORT=5340
REPLICA_PORT=5341

DBNAME="test_db"
TABLE="test_table"
TRIGGER_FILE="./stop_inserting"

CYAN="\e[1;36m"
RESET="\e[0m"

TEST_NUM=0
reset_databases() (
    test_start_time=$(date +"%Y-%m-%d_%H-%M-%S")

    if pg_ctl status -D "$REPLICA_DIR" > /dev/null 2>&1 && ! pg_ctl stop -m
immediate -D "$REPLICA_DIR"; then
        echo "ERROR: Could not stop replica."
        return 1
    fi

    if ! rm -rf "$REPLICA_DIR"; then
        echo "ERROR: Could not delete replica directory \"$REPLICA_DIR\"."
        return 1
    fi

    if pg_ctl status -D "$PRIMARY_DIR" > /dev/null 2>&1 && ! pg_ctl stop -m
immediate -D "$PRIMARY_DIR"; then
        echo "ERROR: Could not stop primary."
        return 1
    fi

    if ! rm -rf "$PRIMARY_DIR"; then
        echo "ERROR: Could not delete primary directory \"$PRIMARY_DIR\"."
        return 1
    fi

    if ! initdb -D "$PRIMARY_DIR" --locale=C; then
        echo "ERROR: Could not create primary."
        return 1
    fi

    cat >> "$PRIMARY_DIR"/postgresql.auto.conf << EOF
port = $PRIMARY_PORT
wal_level = logical
max_worker_processes = 10
max_replication_slots = 10
max_wal_senders = 10
max_connections = 1000
log_directory = '$PWD/log'
log_filename = 'postgresql-$test_start_time-test-$TEST_NUM.log'
log_checkpoints = on
log_connections = on
log_disconnections = on
log_replication_commands = on
log_duration = off
log_min_duration_statement = 0
log_statement = all
log_replication_commands = on
log_line_prefix = '%m [%p] port=$PRIMARY_PORT %q%u@%d/%a '
EOF

    cat >> "$PRIMARY_DIR"/pg_hba.conf << EOF
host   replication      postgres     127.0.0.1/32            trust
host       test_db      postgres     127.0.0.1/32            trust
host       test_db      postgres     127.0.0.1/32            trust
EOF

    if ! pg_ctl start -D "$PRIMARY_DIR"; then
        echo "ERROR: Could not start primary."
        return 1
    fi

    if ! pg_basebackup -h localhost -U postgres -p "$PRIMARY_PORT" -D
"$REPLICA_DIR" -P -Xs -R; then
        echo "ERROR: Could not create replica."
        return 1
    fi

    cat >> "$REPLICA_DIR"/postgresql.auto.conf << EOF
port = $REPLICA_PORT
archive_mode = off
hot_standby = on
log_directory = '$PWD/log'
log_filename = 'postgresql-$test_start_time-test-$TEST_NUM.log'
log_checkpoints = on
log_connections = on
log_disconnections = on
log_replication_commands = on
log_duration = off
log_min_duration_statement = 0
log_statement = all
log_replication_commands = on
log_line_prefix = '%m [%p] port=$REPLICA_PORT %q%u@%d/%a '
EOF

    if ! pg_ctl start -D "$REPLICA_DIR"; then
        echo "ERROR: Could not start replica."
        return 1
    fi
)

create_test_database() (
    psql -p "$PRIMARY_PORT" -c "CREATE DATABASE $DBNAME"
    psql -p "$PRIMARY_PORT" -d "$DBNAME" -c "CREATE TABLE $TABLE (f1 int
primary key, insert_time timestamp without time zone)"
)

# Adjust these variables to tweak the load characteristics
NUM_THREADS=${NUM_THREADS:-20}
INSERT_WIDTH=${INSERT_WIDTH:-100}

add_records_to_test_table() (
    id=$1
    start_time=$(date +%s)
    start=$((id * INSERT_WIDTH)) end=0

    while true; do
        end=$((start + INSERT_WIDTH - 1))
        echo "INSERT INTO $TABLE SELECT val, CURRENT_TIMESTAMP FROM
generate_series($start, $end) S(val);" |
            psql -p "$PRIMARY_PORT" -d "$DBNAME" > /dev/null
        start=$((start + NUM_THREADS * INSERT_WIDTH))

        if [[ -f "$TRIGGER_FILE" ]] || (( $(date "+%s") - start_time > 15
)); then
            break
        fi
    done

    return 0
)

INSERT_PIDS=()
start_insert_threads() {
    echo "*** STARTING $NUM_THREADS LOOPS INSERTING $INSERT_WIDTH RECORDS
PER ITERATION ..."
    INSERT_PIDS=()
    for id in $(seq 0 $((NUM_THREADS - 1))); do
        add_records_to_test_table "$id" &
        INSERT_PIDS+=($!)
    done
}

create_subscriber() (
    echo "*** Stopping replica, then running pg_createsubscriber..."

    pg_ctl stop -m fast -D "$REPLICA_DIR"

    pg_createsubscriber -D "$REPLICA_DIR" \
        --database="$DBNAME" \
        --subscription="sub" \
        --publication="pub" \
        --publisher-server="host=localhost port=$PRIMARY_PORT
dbname=$DBNAME"

    pg_ctl start -D "$REPLICA_DIR"

    echo "*** Successfully started logical replica on port $REPLICA_PORT."
)

run_test() (
    create_test_database
    rm -f "$TRIGGER_FILE"
    start_insert_threads
    sleep 2
    create_subscriber
    sleep 0.1

    touch "$TRIGGER_FILE"
    errors=0
    for pid in "${INSERT_PIDS[@]}"; do
        if ! wait "$pid"; then
            echo "ERROR: insert thread with PID $pid failed"
            errors=1
        fi
    done
    (( errors )) && return 1

    echo "*** ALL INSERT LOOPS FINISHED"

    last_src_count=0
    last_dest_count=0

    # wait until the counts stop increasing
    while true; do
        src_count=$(psql -qt -p "$PRIMARY_PORT" -d "$DBNAME" -c "SELECT
COUNT(f1) FROM $TABLE")
        dest_count=$(psql -qt -p "$REPLICA_PORT" -d "$DBNAME" -c "SELECT
COUNT(f1) FROM $TABLE")

        if [[ $dest_count -ne $last_dest_count ]] || [[ $src_count -ne
$last_src_count ]]; then
            last_dest_count=$dest_count
            last_src_count=$src_count
            sleep 0.5s
            continue;
        fi

        echo "SOURCE COUNT = $src_count"
        echo "DEST COUNT   = $dest_count"

        if (( src_count != dest_count )); then
            echo "ERROR: record count mismatch"
            return 1
        fi

        break
    done

    echo -e "*** TEST PASSED\n"
)

for TEST_NUM in {1..10000}; do
    echo -e "${CYAN}*** STARTING TEST #$TEST_NUM${RESET}"
    reset_databases && run_test
done
```

Attachment