From 0ccbacd01b326078aa750f1a72f38823f8f9c8b3 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Thu, 22 Feb 2024 11:53:29 +0900 Subject: [PATCH v3 1/2] injection_points: Add routines to wait and wake processes This commit is made of two parts: - A new callback that can be attached to a process to make it wait on a condition variable. The condition checked is registered in shared memory by the module injection_points. - A new SQL function to update the shared state and broadcast the update using a condition variable. The shared state used by the module is registered using the DSM registry, and is optional. --- .../injection_points--1.0.sql | 10 ++ .../injection_points/injection_points.c | 155 ++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 166 insertions(+) diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql index 5944c41716..0a2e59aba7 100644 --- a/src/test/modules/injection_points/injection_points--1.0.sql +++ b/src/test/modules/injection_points/injection_points--1.0.sql @@ -24,6 +24,16 @@ RETURNS void AS 'MODULE_PATHNAME', 'injection_points_run' LANGUAGE C STRICT PARALLEL UNSAFE; +-- +-- injection_points_wakeup() +-- +-- Wakes up a waiting injection point. +-- +CREATE FUNCTION injection_points_wakeup(IN point_name TEXT) +RETURNS void +AS 'MODULE_PATHNAME', 'injection_points_wakeup' +LANGUAGE C STRICT PARALLEL UNSAFE; + -- -- injection_points_detach() -- diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c index e843e6594f..142211d0d7 100644 --- a/src/test/modules/injection_points/injection_points.c +++ b/src/test/modules/injection_points/injection_points.c @@ -18,18 +18,76 @@ #include "postgres.h" #include "fmgr.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" +#include "storage/dsm_registry.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/wait_event.h" PG_MODULE_MAGIC; +/* Maximum number of wait usable in injection points at once */ +#define INJ_MAX_WAIT 32 +#define INJ_NAME_MAXLEN 64 + +/* Shared state information for injection points. */ +typedef struct InjectionPointSharedState +{ + /* protects accesses to wait_counts */ + slock_t lock; + + /* Counters advancing when injection_points_wakeup() is called */ + uint32 wait_counts[INJ_MAX_WAIT]; + + /* Names of injection points attached to wait counters */ + char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN]; + + /* + * Condition variable used for waits and wakeups, checking upon the set of + * wait_counts when waiting. + */ + ConditionVariable wait_point; +} InjectionPointSharedState; + +/* Pointer to shared-memory state. */ +static InjectionPointSharedState *inj_state = NULL; + extern PGDLLEXPORT void injection_error(const char *name); extern PGDLLEXPORT void injection_notice(const char *name); +extern PGDLLEXPORT void injection_wait(const char *name); +/* Callback for shmem area initialization */ +static void +injection_point_init_state(void *ptr) +{ + InjectionPointSharedState *state = (InjectionPointSharedState *) ptr; + + SpinLockInit(&state->lock); + memset(state->wait_counts, 0, sizeof(state->wait_counts)); + memset(state->name, 0, sizeof(state->name)); + ConditionVariableInit(&state->wait_point); +} + +/* + * Initialize shared memory area for this module. + */ +static void +injection_init_shmem(void) +{ + bool found; + + if (inj_state != NULL) + return; + + inj_state = GetNamedDSMSegment("injection_points", + sizeof(InjectionPointSharedState), + injection_point_init_state, + &found); +} + /* Set of callbacks available to be attached to an injection point. */ void injection_error(const char *name) @@ -43,6 +101,65 @@ injection_notice(const char *name) elog(NOTICE, "notice triggered for injection point %s", name); } +/* Wait on a condition variable, awaken by injection_points_wakeup() */ +void +injection_wait(const char *name) +{ + uint32 old_wait_counts = 0; + int index = -1; + uint32 injection_wait_event = 0; + + if (inj_state == NULL) + injection_init_shmem(); + + /* + * This custom wait event name is not released, but we don't care much for + * testing as this will be short-lived. + */ + injection_wait_event = WaitEventExtensionNew(name); + + /* + * Find a free slot to wait for, and register this injection point's name. + */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (inj_state->name[i][0] == '\0') + { + index = i; + strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN); + old_wait_counts = inj_state->wait_counts[i]; + break; + } + } + SpinLockRelease(&inj_state->lock); + + if (index < 0) + elog(ERROR, "could not find free slot for wait of injection point %s ", + name); + + /* And sleep.. */ + ConditionVariablePrepareToSleep(&inj_state->wait_point); + for (;;) + { + uint32 new_wait_counts; + + SpinLockAcquire(&inj_state->lock); + new_wait_counts = inj_state->wait_counts[index]; + SpinLockRelease(&inj_state->lock); + + if (old_wait_counts != new_wait_counts) + break; + ConditionVariableSleep(&inj_state->wait_point, injection_wait_event); + } + ConditionVariableCancelSleep(); + + /* Remove this injection point from the waiters. */ + SpinLockAcquire(&inj_state->lock); + inj_state->name[index][0] = '\0'; + SpinLockRelease(&inj_state->lock); +} + /* * SQL function for creating an injection point. */ @@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS) function = "injection_error"; else if (strcmp(action, "notice") == 0) function = "injection_notice"; + else if (strcmp(action, "wait") == 0) + function = "injection_wait"; else elog(ERROR, "incorrect action \"%s\" for injection point creation", action); @@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * SQL function for waking up an injection point waiting in injection_wait(). + */ +PG_FUNCTION_INFO_V1(injection_points_wakeup); +Datum +injection_points_wakeup(PG_FUNCTION_ARGS) +{ + char *name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + int index = -1; + + if (inj_state == NULL) + injection_init_shmem(); + + /* First bump the wait counter for the injection point to wake up */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (strcmp(name, inj_state->name[i]) == 0) + { + index = i; + break; + } + } + if (index < 0) + { + SpinLockRelease(&inj_state->lock); + elog(ERROR, "could not find injection point %s to wake up", name); + } + inj_state->wait_counts[index]++; + SpinLockRelease(&inj_state->lock); + + /* And broadcast the change for the waiters */ + ConditionVariableBroadcast(&inj_state->wait_point); + PG_RETURN_VOID(); +} + /* * SQL function for dropping an injection point. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d808aad8b0..d7eca00502 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1208,6 +1208,7 @@ InitializeDSMForeignScan_function InitializeWorkerForeignScan_function InjectionPointCacheEntry InjectionPointEntry +InjectionPointSharedState InlineCodeBlock InsertStmt Instrumentation -- 2.43.0