From 75302cba302b83ce2a6d6eaf30b163f473b87276 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Wed, 21 Feb 2024 16:36:25 +0900 Subject: [PATCH v2 1/2] injection_points: Add routines to wait and wake processes This commit is made of two parts: - A new callback that can be attached to a process to make it wait on a condition variable. The condition checked is registered in shared memory by the module injection_points. - A new SQL function to update the shared state and broadcast the update using a condition variable. The shared state used by the module is registered using the DSM registry, and is optional. --- .../injection_points--1.0.sql | 10 ++ .../injection_points/injection_points.c | 151 ++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 162 insertions(+) diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql index 5944c41716..eed0310cf6 100644 --- a/src/test/modules/injection_points/injection_points--1.0.sql +++ b/src/test/modules/injection_points/injection_points--1.0.sql @@ -24,6 +24,16 @@ RETURNS void AS 'MODULE_PATHNAME', 'injection_points_run' LANGUAGE C STRICT PARALLEL UNSAFE; +-- +-- injection_points_wakeup() +-- +-- Wakes a condition variable waited on in an injection point. +-- +CREATE FUNCTION injection_points_wakeup(IN point_name TEXT) +RETURNS void +AS 'MODULE_PATHNAME', 'injection_points_wakeup' +LANGUAGE C STRICT PARALLEL UNSAFE; + -- -- injection_points_detach() -- diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c index e843e6594f..052b20f9c8 100644 --- a/src/test/modules/injection_points/injection_points.c +++ b/src/test/modules/injection_points/injection_points.c @@ -18,18 +18,72 @@ #include "postgres.h" #include "fmgr.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" +#include "storage/dsm_registry.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/wait_event.h" PG_MODULE_MAGIC; +/* Maximum number of wait usable in injection points at once */ +#define INJ_MAX_WAIT 32 +#define INJ_NAME_MAXLEN 64 + +/* Shared state information for injection points. */ +typedef struct InjectionPointSharedState +{ + /* protects accesses to wait_counts */ + slock_t lock; + + /* Counters advancing when injection_points_wakeup() is called */ + int wait_counts[INJ_MAX_WAIT]; + + /* Names of injection points attached to wait counters */ + char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN]; + + /* + * Condition variable used for waits and wakeups, checking upon the set of + * wait_counts when waiting. + */ + ConditionVariable wait_point; +} InjectionPointSharedState; + +/* Pointer to shared-memory state. */ +static InjectionPointSharedState *inj_state = NULL; + extern PGDLLEXPORT void injection_error(const char *name); extern PGDLLEXPORT void injection_notice(const char *name); +extern PGDLLEXPORT void injection_wait(const char *name); +static void +injection_point_init_state(void *ptr) +{ + InjectionPointSharedState *state = (InjectionPointSharedState *) ptr; + + SpinLockInit(&state->lock); + memset(state->wait_counts, 0, sizeof(state->wait_counts)); + memset(state->name, 0, sizeof(state->name)); + ConditionVariableInit(&state->wait_point); +} + +static void +injection_init_shmem(void) +{ + bool found; + + if (inj_state != NULL) + return; + + inj_state = GetNamedDSMSegment("injection_points", + sizeof(InjectionPointSharedState), + injection_point_init_state, + &found); +} + /* Set of callbacks available to be attached to an injection point. */ void injection_error(const char *name) @@ -43,6 +97,65 @@ injection_notice(const char *name) elog(NOTICE, "notice triggered for injection point %s", name); } +/* Wait on a condition variable, awaken by injection_points_wakeup() */ +void +injection_wait(const char *name) +{ + int old_wait_counts = -1; + int index = -1; + uint32 injection_wait_event = 0; + + if (inj_state == NULL) + injection_init_shmem(); + + /* + * This custom wait event name is not released, but we don't care much for + * testing as this will be short-lived. + */ + injection_wait_event = WaitEventExtensionNew(name); + + /* + * Find a free slot to wait for, and register this injection point's name. + */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (inj_state->name[i][0] == '\0') + { + index = i; + strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN); + old_wait_counts = inj_state->wait_counts[i]; + break; + } + } + SpinLockRelease(&inj_state->lock); + + if (index < 0) + elog(ERROR, "could not find free slot for wait of injection point %s ", + name); + + /* And sleep.. */ + ConditionVariablePrepareToSleep(&inj_state->wait_point); + for (;;) + { + int new_wait_counts; + + SpinLockAcquire(&inj_state->lock); + new_wait_counts = inj_state->wait_counts[index]; + SpinLockRelease(&inj_state->lock); + + if (old_wait_counts != new_wait_counts) + break; + ConditionVariableSleep(&inj_state->wait_point, injection_wait_event); + } + ConditionVariableCancelSleep(); + + /* Remove us from the waiting list */ + SpinLockAcquire(&inj_state->lock); + inj_state->name[index][0] = '\0'; + SpinLockRelease(&inj_state->lock); +} + /* * SQL function for creating an injection point. */ @@ -58,6 +171,8 @@ injection_points_attach(PG_FUNCTION_ARGS) function = "injection_error"; else if (strcmp(action, "notice") == 0) function = "injection_notice"; + else if (strcmp(action, "wait") == 0) + function = "injection_wait"; else elog(ERROR, "incorrect action \"%s\" for injection point creation", action); @@ -80,6 +195,42 @@ injection_points_run(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * SQL function for waking a condition variable. + */ +PG_FUNCTION_INFO_V1(injection_points_wakeup); +Datum +injection_points_wakeup(PG_FUNCTION_ARGS) +{ + char *name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + int index = -1; + + if (inj_state == NULL) + injection_init_shmem(); + + /* First bump the wait counter for the injection point to wake */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (strcmp(name, inj_state->name[i]) == 0) + { + index = i; + break; + } + } + if (index < 0) + { + SpinLockRelease(&inj_state->lock); + elog(ERROR, "could not find injection point %s to wake", name); + } + inj_state->wait_counts[index]++; + SpinLockRelease(&inj_state->lock); + + /* And broadcast the change for the waiters */ + ConditionVariableBroadcast(&inj_state->wait_point); + PG_RETURN_VOID(); +} + /* * SQL function for dropping an injection point. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d808aad8b0..d7eca00502 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1208,6 +1208,7 @@ InitializeDSMForeignScan_function InitializeWorkerForeignScan_function InjectionPointCacheEntry InjectionPointEntry +InjectionPointSharedState InlineCodeBlock InsertStmt Instrumentation -- 2.43.0