Thread: CTEs and concurrency
Dear,
I am currently running a chaos test on a system (essentially starting nodes that process something and randomly knockign them out). It appeared to work fine with regular tests but I am seeing occasional duplicate key value violattions of a uniqueness constraint on one of the complexer CTE-based queries. Something that only happens with concurrency where nodes restart and ample load.
I can not reproduce it by taking out the query and running it manually in PG Admin, it behaves fine if I do so and does exactly what I expect.
The query looks like this (it uses Rust SQLX which is why there is some unnesting happening on the parameters).
The query looks like this (it uses Rust SQLX which is why there is some unnesting happening on the parameters).
WITH unnested_inputs AS (
SELECT * FROM (
SELECT
unnest($1::uuid[]) AS event_id,
unnest($2::varchar[]) AS type,
unnest($3::int[]) AS version,
unnest($4::uuid[]) AS causation_id,
unnest($5::uuid[]) AS correlation_id,
unnest($6::text[]) AS idempotency_key,
unnest($7::jsonb[]) AS data,
unnest($8::jsonb[]) AS metadata,
unnest($9::text[]) AS subscription_id,
unnest($10::text[]) AS subscription_instance_identifier,
unnest($11::bigint[]) AS applied_order_id
) AS inputs
),
to_update_subscription_logs AS (
SELECT sl.id as subscription_log_id, sl.node_id, sl.status, ui.*
FROM subscription_log sl
JOIN unnested_inputs ui
ON sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned'
AND sl.subscription_id = ui.subscription_id
AND sl.subscription_instance_identifier = ui.subscription_instance_identifier
FOR UPDATE NOWAIT -- if something is updating it, we probably shouldn't touch it anymore.
),
updated_logs AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
WHERE sl.id = usl.subscription_log_id
AND usl.node_id = $12
),
inserted_event_log AS (
INSERT INTO event_log (
event_id, type, version, causation_id, correlation_id,
idempotency_key, data, metadata, created_at
)
SELECT
event_id, type, version, usl.causation_id, correlation_id,
idempotency_key, data, metadata, CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
),
inserted_output_routing_info AS (
INSERT INTO output_event_routing (event_id, subscription_id, subscription_instance_identifier, applied_order_id)
SELECT event_id, subscription_id, subscription_instance_identifier, applied_order_id
FROM to_update_subscription_logs usl
),
SELECT * FROM to_update_subscription_logs
SELECT * FROM (
SELECT
unnest($1::uuid[]) AS event_id,
unnest($2::varchar[]) AS type,
unnest($3::int[]) AS version,
unnest($4::uuid[]) AS causation_id,
unnest($5::uuid[]) AS correlation_id,
unnest($6::text[]) AS idempotency_key,
unnest($7::jsonb[]) AS data,
unnest($8::jsonb[]) AS metadata,
unnest($9::text[]) AS subscription_id,
unnest($10::text[]) AS subscription_instance_identifier,
unnest($11::bigint[]) AS applied_order_id
) AS inputs
),
to_update_subscription_logs AS (
SELECT sl.id as subscription_log_id, sl.node_id, sl.status, ui.*
FROM subscription_log sl
JOIN unnested_inputs ui
ON sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned'
AND sl.subscription_id = ui.subscription_id
AND sl.subscription_instance_identifier = ui.subscription_instance_identifier
FOR UPDATE NOWAIT -- if something is updating it, we probably shouldn't touch it anymore.
),
updated_logs AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
WHERE sl.id = usl.subscription_log_id
AND usl.node_id = $12
),
inserted_event_log AS (
INSERT INTO event_log (
event_id, type, version, causation_id, correlation_id,
idempotency_key, data, metadata, created_at
)
SELECT
event_id, type, version, usl.causation_id, correlation_id,
idempotency_key, data, metadata, CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
),
inserted_output_routing_info AS (
INSERT INTO output_event_routing (event_id, subscription_id, subscription_instance_identifier, applied_order_id)
SELECT event_id, subscription_id, subscription_instance_identifier, applied_order_id
FROM to_update_subscription_logs usl
),
SELECT * FROM to_update_subscription_logs
The tables look as follows:
CREATE TABLE event_log (
event_id UUID PRIMARY KEY,
event_order_id BIGINT REFERENCES event(order_id),
type varchar NOT NULL,
version int NOT NULL,
causation_id UUID,
correlation_id UUID,
idempotency_key TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT constraint_event_log_unique_idempotency_key UNIQUE(idempotency_key) -- idempotent writes.
);
CREATE TABLE output_event_routing (
event_id UUID REFERENCES event_log(event_id),
subscription_id TEXT NOT NULL,
subscription_instance_identifier TEXT,
applied_order_id BIGINT,
CONSTRAINT constraint_output_event_routing_uniqueness UNIQUE(subscription_id, subscription_instance_identifier, applied_order_id)
);
CREATE TABLE subscription_log (
id UUID NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
event_order_id BIGINT NOT NULL,
event_correlation_id UUID NOT NULL,
subscription_instance_identifier TEXT NOT NULL,
subscription_id TEXT NOT NULL REFERENCES subscription(name),
status processing_status NOT NULL DEFAULT 'enqueued',
node_id UUID references node(id), -- is null until assigned.
);
CREATE TABLE event_log (
event_id UUID PRIMARY KEY,
event_order_id BIGINT REFERENCES event(order_id),
type varchar NOT NULL,
version int NOT NULL,
causation_id UUID,
correlation_id UUID,
idempotency_key TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT constraint_event_log_unique_idempotency_key UNIQUE(idempotency_key) -- idempotent writes.
);
CREATE TABLE output_event_routing (
event_id UUID REFERENCES event_log(event_id),
subscription_id TEXT NOT NULL,
subscription_instance_identifier TEXT,
applied_order_id BIGINT,
CONSTRAINT constraint_output_event_routing_uniqueness UNIQUE(subscription_id, subscription_instance_identifier, applied_order_id)
);
CREATE TABLE subscription_log (
id UUID NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
event_order_id BIGINT NOT NULL,
event_correlation_id UUID NOT NULL,
subscription_instance_identifier TEXT NOT NULL,
subscription_id TEXT NOT NULL REFERENCES subscription(name),
status processing_status NOT NULL DEFAULT 'enqueued',
node_id UUID references node(id), -- is null until assigned.
);
Since I'm trying to avoid using PL/pgSQL upon request I tried to achieve the following behaviour in CTEs:
- For given events, update the subscription log to 'processed' only if we still are the node that is processing these and the status is still 'assigned'.
- Only for the events where the previous succeeded, continue processing by inserting in the event_log and inserting in the inserted_output_routing_info.
The mechanism aims to make sure we don't insert results of event processing twice.
When logging the input values, we can see that there are indeed two times the same value sets (exactly the same) passed for different nodes, that's to be expected and exactly what has to be caught by this logic. Same values, but another node. What we see is that one node succeeds and the other node fails due to the uniqueness violation. Which is actually fine from a business perspective since rolling back has the same effect, albeit with an error that I didn't expect. However, I would love to understand this, how can one node succeed and set the status of the log to 'processed' and continue to insert the values while the other, apparently also is able to continue inserting (which means that both nodes saw 'assigned' in the select when it locked it for update). Is there something I do not fully understand about how CTEs work in combination with locks?
Things I tried:
1. Whether I go for regular FOR UPDATE, SKIP LOCK or NOWAIT makes no difference.
2. I do return the to_update_subscription_logs at the end to be sure the lock is held (I'm aware of that CTE behaviour for selects) even if it's used by the next CTE.
- For given events, update the subscription log to 'processed' only if we still are the node that is processing these and the status is still 'assigned'.
- Only for the events where the previous succeeded, continue processing by inserting in the event_log and inserting in the inserted_output_routing_info.
The mechanism aims to make sure we don't insert results of event processing twice.
When logging the input values, we can see that there are indeed two times the same value sets (exactly the same) passed for different nodes, that's to be expected and exactly what has to be caught by this logic. Same values, but another node. What we see is that one node succeeds and the other node fails due to the uniqueness violation. Which is actually fine from a business perspective since rolling back has the same effect, albeit with an error that I didn't expect. However, I would love to understand this, how can one node succeed and set the status of the log to 'processed' and continue to insert the values while the other, apparently also is able to continue inserting (which means that both nodes saw 'assigned' in the select when it locked it for update). Is there something I do not fully understand about how CTEs work in combination with locks?
Things I tried:
1. Whether I go for regular FOR UPDATE, SKIP LOCK or NOWAIT makes no difference.
2. I do return the to_update_subscription_logs at the end to be sure the lock is held (I'm aware of that CTE behaviour for selects) even if it's used by the next CTE.
3. Changing it to UPDATE/RETURN (which was my original logic)
updated_subscription AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM unnested_inputs ui
WHERE sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned' -- Assuming you're updating from 'assigned'
RETURNING ui.causation_id
)
then doing a distinct on the causation ID and only allow inserts for values that have a causation ID that was successfully updated to processed yields exactly the same behaviour, everything goes fine but once in a while things go wrong when a node dies and another takes over.
4. I also tried with both approaches to do more explicit checks in the joins when we insert to make sure that in the insert CTEs the node is still the same etc, things that shouldn't be necessary, they also didn't change anything.
5. After the error we can see that the succeeding node successfully set the status to process and that it 'owns' the subscription_log entry for that processed event. Which means the other node should not have been able to get passed the lock.
If anyone could provide some insight of what knowledge I'm missing about how CTEs work that would be amazing. Sorry that it's a rather complex case which makes it hard to come up with something 'smaller' with the same tests that reproduce it.
updated_subscription AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM unnested_inputs ui
WHERE sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned' -- Assuming you're updating from 'assigned'
RETURNING ui.causation_id
)
then doing a distinct on the causation ID and only allow inserts for values that have a causation ID that was successfully updated to processed yields exactly the same behaviour, everything goes fine but once in a while things go wrong when a node dies and another takes over.
4. I also tried with both approaches to do more explicit checks in the joins when we insert to make sure that in the insert CTEs the node is still the same etc, things that shouldn't be necessary, they also didn't change anything.
5. After the error we can see that the succeeding node successfully set the status to process and that it 'owns' the subscription_log entry for that processed event. Which means the other node should not have been able to get passed the lock.
If anyone could provide some insight of what knowledge I'm missing about how CTEs work that would be amazing. Sorry that it's a rather complex case which makes it hard to come up with something 'smaller' with the same tests that reproduce it.
My apologies, when I said:
"When logging the input values, we can see that there are indeed two times
the same value sets (exactly the same)"It seems I was wrong, the UUIDs differed on 1-2 characters which means that the problem lies elsewhere in the system. It does seem that my assumptions of how this query should behave are correct and that the problem lies elsewhere.
On Mar 4, 2024, at 22:44, Brecht De Rooms <databrecht@gmail.com> wrote:Dear,I am currently running a chaos test on a system (essentially starting nodes that process something and randomly knockign them out). It appeared to work fine with regular tests but I am seeing occasional duplicate key value violattions of a uniqueness constraint on one of the complexer CTE-based queries. Something that only happens with concurrency where nodes restart and ample load.I can not reproduce it by taking out the query and running it manually in PG Admin, it behaves fine if I do so and does exactly what I expect.
The query looks like this (it uses Rust SQLX which is why there is some unnesting happening on the parameters).WITH unnested_inputs AS (
SELECT * FROM (
SELECT
unnest($1::uuid[]) AS event_id,
unnest($2::varchar[]) AS type,
unnest($3::int[]) AS version,
unnest($4::uuid[]) AS causation_id,
unnest($5::uuid[]) AS correlation_id,
unnest($6::text[]) AS idempotency_key,
unnest($7::jsonb[]) AS data,
unnest($8::jsonb[]) AS metadata,
unnest($9::text[]) AS subscription_id,
unnest($10::text[]) AS subscription_instance_identifier,
unnest($11::bigint[]) AS applied_order_id
) AS inputs
),
to_update_subscription_logs AS (
SELECT sl.id as subscription_log_id, sl.node_id, sl.status, ui.*
FROM subscription_log sl
JOIN unnested_inputs ui
ON sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned'
AND sl.subscription_id = ui.subscription_id
AND sl.subscription_instance_identifier = ui.subscription_instance_identifier
FOR UPDATE NOWAIT -- if something is updating it, we probably shouldn't touch it anymore.
),
updated_logs AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
WHERE sl.id = usl.subscription_log_id
AND usl.node_id = $12
),
inserted_event_log AS (
INSERT INTO event_log (
event_id, type, version, causation_id, correlation_id,
idempotency_key, data, metadata, created_at
)
SELECT
event_id, type, version, usl.causation_id, correlation_id,
idempotency_key, data, metadata, CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
),
inserted_output_routing_info AS (
INSERT INTO output_event_routing (event_id, subscription_id, subscription_instance_identifier, applied_order_id)
SELECT event_id, subscription_id, subscription_instance_identifier, applied_order_id
FROM to_update_subscription_logs usl
),
SELECT * FROM to_update_subscription_logsThe tables look as follows:
CREATE TABLE event_log (
event_id UUID PRIMARY KEY,
event_order_id BIGINT REFERENCES event(order_id),
type varchar NOT NULL,
version int NOT NULL,
causation_id UUID,
correlation_id UUID,
idempotency_key TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT constraint_event_log_unique_idempotency_key UNIQUE(idempotency_key) -- idempotent writes.
);
CREATE TABLE output_event_routing (
event_id UUID REFERENCES event_log(event_id),
subscription_id TEXT NOT NULL,
subscription_instance_identifier TEXT,
applied_order_id BIGINT,
CONSTRAINT constraint_output_event_routing_uniqueness UNIQUE(subscription_id, subscription_instance_identifier, applied_order_id)
);
CREATE TABLE subscription_log (
id UUID NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
event_order_id BIGINT NOT NULL,
event_correlation_id UUID NOT NULL,
subscription_instance_identifier TEXT NOT NULL,
subscription_id TEXT NOT NULL REFERENCES subscription(name),
status processing_status NOT NULL DEFAULT 'enqueued',
node_id UUID references node(id), -- is null until assigned.
);Since I'm trying to avoid using PL/pgSQL upon request I tried to achieve the following behaviour in CTEs:
- For given events, update the subscription log to 'processed' only if we still are the node that is processing these and the status is still 'assigned'.
- Only for the events where the previous succeeded, continue processing by inserting in the event_log and inserting in the inserted_output_routing_info.
The mechanism aims to make sure we don't insert results of event processing twice.
When logging the input values, we can see that there are indeed two times the same value sets (exactly the same) passed for different nodes, that's to be expected and exactly what has to be caught by this logic. Same values, but another node. What we see is that one node succeeds and the other node fails due to the uniqueness violation. Which is actually fine from a business perspective since rolling back has the same effect, albeit with an error that I didn't expect. However, I would love to understand this, how can one node succeed and set the status of the log to 'processed' and continue to insert the values while the other, apparently also is able to continue inserting (which means that both nodes saw 'assigned' in the select when it locked it for update). Is there something I do not fully understand about how CTEs work in combination with locks?
Things I tried:
1. Whether I go for regular FOR UPDATE, SKIP LOCK or NOWAIT makes no difference.
2. I do return the to_update_subscription_logs at the end to be sure the lock is held (I'm aware of that CTE behaviour for selects) even if it's used by the next CTE.3. Changing it to UPDATE/RETURN (which was my original logic)
updated_subscription AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM unnested_inputs ui
WHERE sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned' -- Assuming you're updating from 'assigned'
RETURNING ui.causation_id
)
then doing a distinct on the causation ID and only allow inserts for values that have a causation ID that was successfully updated to processed yields exactly the same behaviour, everything goes fine but once in a while things go wrong when a node dies and another takes over.
4. I also tried with both approaches to do more explicit checks in the joins when we insert to make sure that in the insert CTEs the node is still the same etc, things that shouldn't be necessary, they also didn't change anything.
5. After the error we can see that the succeeding node successfully set the status to process and that it 'owns' the subscription_log entry for that processed event. Which means the other node should not have been able to get passed the lock.
If anyone could provide some insight of what knowledge I'm missing about how CTEs work that would be amazing. Sorry that it's a rather complex case which makes it hard to come up with something 'smaller' with the same tests that reproduce it.