From ef27d6eba619f5106915de6b05cbeb5294263c30 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Mon, 19 Feb 2024 14:35:55 +0900 Subject: [PATCH v2 1/2] injection_points: Add routines to wait and wake processes This commit is made of two parts: - A new callback that can be attached to a process to make it wait on a condition variable. The condition checked is registered in shared memory by the module injection_points. - A new SQL function to update the shared state and broadcast the update using a condition variable. The shared state used by the module is registered using the DSM registry, and is optional. --- .../injection_points--1.0.sql | 10 ++ .../injection_points/injection_points.c | 104 ++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 115 insertions(+) diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql index 5944c41716..20479991f2 100644 --- a/src/test/modules/injection_points/injection_points--1.0.sql +++ b/src/test/modules/injection_points/injection_points--1.0.sql @@ -24,6 +24,16 @@ RETURNS void AS 'MODULE_PATHNAME', 'injection_points_run' LANGUAGE C STRICT PARALLEL UNSAFE; +-- +-- injection_points_wake() +-- +-- Wakes a condition variable waited on in an injection point. +-- +CREATE FUNCTION injection_points_wake() +RETURNS void +AS 'MODULE_PATHNAME', 'injection_points_wake' +LANGUAGE C STRICT PARALLEL UNSAFE; + -- -- injection_points_detach() -- diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c index e843e6594f..3a319b1525 100644 --- a/src/test/modules/injection_points/injection_points.c +++ b/src/test/modules/injection_points/injection_points.c @@ -18,18 +18,67 @@ #include "postgres.h" #include "fmgr.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" +#include "storage/dsm_registry.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/wait_event.h" PG_MODULE_MAGIC; +/* Shared state information for injection points. */ +typedef struct InjectionPointSharedState +{ + /* protects accesses to wait_counts */ + slock_t lock; + + /* Counter advancing when injection_points_wake() is called */ + int wait_counts; + + /* + * Condition variable that can be used in an injection point, checking + * upon wait_counts when waiting. + */ + ConditionVariable wait_point; +} InjectionPointSharedState; + +/* Pointer to shared-memory state. */ +static InjectionPointSharedState *inj_state = NULL; + +/* Wait event when waiting on condition variable */ +static uint32 injection_wait_event = 0; + extern PGDLLEXPORT void injection_error(const char *name); extern PGDLLEXPORT void injection_notice(const char *name); +extern PGDLLEXPORT void injection_wait(const char *name); +static void +injection_point_init_state(void *ptr) +{ + InjectionPointSharedState *state = (InjectionPointSharedState *) ptr; + + state->wait_counts = 0; + SpinLockInit(&state->lock); + ConditionVariableInit(&state->wait_point); +} + +static void +injection_init_shmem(void) +{ + bool found; + + if (inj_state != NULL) + return; + + inj_state = GetNamedDSMSegment("injection_points", + sizeof(InjectionPointSharedState), + injection_point_init_state, + &found); +} + /* Set of callbacks available to be attached to an injection point. */ void injection_error(const char *name) @@ -43,6 +92,38 @@ injection_notice(const char *name) elog(NOTICE, "notice triggered for injection point %s", name); } +/* Wait on a condition variable, awaken by injection_points_wake() */ +void +injection_wait(const char *name) +{ + int old_wait_counts; + + if (inj_state == NULL) + injection_init_shmem(); + if (injection_wait_event == 0) + injection_wait_event = WaitEventExtensionNew("injection_wait"); + + SpinLockAcquire(&inj_state->lock); + old_wait_counts = inj_state->wait_counts; + SpinLockRelease(&inj_state->lock); + + /* And sleep.. */ + ConditionVariablePrepareToSleep(&inj_state->wait_point); + for (;;) + { + int new_wait_counts; + + SpinLockAcquire(&inj_state->lock); + new_wait_counts = inj_state->wait_counts; + SpinLockRelease(&inj_state->lock); + + if (old_wait_counts != new_wait_counts) + break; + ConditionVariableSleep(&inj_state->wait_point, injection_wait_event); + } + ConditionVariableCancelSleep(); +} + /* * SQL function for creating an injection point. */ @@ -58,6 +139,8 @@ injection_points_attach(PG_FUNCTION_ARGS) function = "injection_error"; else if (strcmp(action, "notice") == 0) function = "injection_notice"; + else if (strcmp(action, "wait") == 0) + function = "injection_wait"; else elog(ERROR, "incorrect action \"%s\" for injection point creation", action); @@ -80,6 +163,27 @@ injection_points_run(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * SQL function for waking a condition variable. + */ +PG_FUNCTION_INFO_V1(injection_points_wake); +Datum +injection_points_wake(PG_FUNCTION_ARGS) +{ + + if (inj_state == NULL) + injection_init_shmem(); + + /* First bump the wait counter */ + SpinLockAcquire(&inj_state->lock); + inj_state->wait_counts++; + SpinLockRelease(&inj_state->lock); + + /* And broadcast the change for the waiters */ + ConditionVariableBroadcast(&inj_state->wait_point); + PG_RETURN_VOID(); +} + /* * SQL function for dropping an injection point. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d808aad8b0..d7eca00502 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1208,6 +1208,7 @@ InitializeDSMForeignScan_function InitializeWorkerForeignScan_function InjectionPointCacheEntry InjectionPointEntry +InjectionPointSharedState InlineCodeBlock InsertStmt Instrumentation -- 2.43.0