From 038b0f55dfe80f168fdc1b01b8cdadbf38fedfa2 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Thu, 28 May 2026 11:15:33 +0900 Subject: [PATCH] injection_points: Switch wait/wakeup to rely on atomics This removes the dependency based on counters and environment variables, replacing the waiting loop by a wait on an atomic counter, whose check increases over time in an exponential manner (starts at 10us, up to 100ms). --- .../injection_points/injection_points.c | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c index ba282e3dcabf..9b8e1aaad0b0 100644 --- a/src/test/modules/injection_points/injection_points.c +++ b/src/test/modules/injection_points/injection_points.c @@ -23,11 +23,11 @@ #include "miscadmin.h" #include "nodes/pg_list.h" #include "nodes/value.h" -#include "storage/condition_variable.h" #include "storage/dsm_registry.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/shmem.h" +#include "storage/spin.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/injection_point.h" @@ -59,13 +59,10 @@ typedef struct InjectionPointSharedState slock_t lock; /* Counters advancing when injection_points_wakeup() is called */ - uint32 wait_counts[INJ_MAX_WAIT]; + pg_atomic_uint32 wait_counts[INJ_MAX_WAIT]; /* Names of injection points attached to wait counters */ char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN]; - - /* Condition variable used for waits and wakeups */ - ConditionVariable wait_point; } InjectionPointSharedState; /* Pointer to shared-memory state. */ @@ -102,9 +99,9 @@ injection_point_init_state(void *ptr, void *arg) InjectionPointSharedState *state = (InjectionPointSharedState *) ptr; SpinLockInit(&state->lock); - memset(state->wait_counts, 0, sizeof(state->wait_counts)); memset(state->name, 0, sizeof(state->name)); - ConditionVariableInit(&state->wait_point); + for (int i = 0; i < INJ_MAX_WAIT; i++) + pg_atomic_init_u32(&state->wait_counts[i], 0); } static void @@ -222,7 +219,7 @@ injection_notice(const char *name, const void *private_data, void *arg) elog(NOTICE, "notice triggered for injection point %s", name); } -/* Wait on a condition variable, awaken by injection_points_wakeup() */ +/* Wait until injection_points_wakeup() is called */ void injection_wait(const char *name, const void *private_data, void *arg) { @@ -254,31 +251,37 @@ injection_wait(const char *name, const void *private_data, void *arg) { index = i; strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN); - old_wait_counts = inj_state->wait_counts[i]; + old_wait_counts = pg_atomic_read_u32(&inj_state->wait_counts[i]); break; } } SpinLockRelease(&inj_state->lock); if (index < 0) - elog(ERROR, "could not find free slot for wait of injection point %s ", + elog(ERROR, "could not find free slot for wait of injection point %s", name); - /* And sleep.. */ - ConditionVariablePrepareToSleep(&inj_state->wait_point); - for (;;) + /* + * Wait until the counter is bumped by injection_points_wakeup(). + * + * This loop starts with a short delay for responsiveness, enlarged to + * ease the CPU workload in slower environments. + */ +#define INJ_WAIT_INITIAL_US 10 /* 10us */ +#define INJ_WAIT_MAX_US 100000 /* 100ms */ + pgstat_report_wait_start(injection_wait_event); { - uint32 new_wait_counts; + int delay_us = INJ_WAIT_INITIAL_US; - SpinLockAcquire(&inj_state->lock); - new_wait_counts = inj_state->wait_counts[index]; - SpinLockRelease(&inj_state->lock); - - if (old_wait_counts != new_wait_counts) - break; - ConditionVariableSleep(&inj_state->wait_point, injection_wait_event); + while (pg_atomic_read_u32(&inj_state->wait_counts[index]) == old_wait_counts) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(delay_us); + if (delay_us < INJ_WAIT_MAX_US) + delay_us *= 2; + } } - ConditionVariableCancelSleep(); + pgstat_report_wait_end(); /* Remove this injection point from the waiters. */ SpinLockAcquire(&inj_state->lock); @@ -443,7 +446,7 @@ injection_points_wakeup(PG_FUNCTION_ARGS) if (inj_state == NULL) injection_init_shmem(); - /* First bump the wait counter for the injection point to wake up */ + /* Find the injection point then bump its wait counter */ SpinLockAcquire(&inj_state->lock); for (int i = 0; i < INJ_MAX_WAIT; i++) { @@ -458,11 +461,9 @@ injection_points_wakeup(PG_FUNCTION_ARGS) SpinLockRelease(&inj_state->lock); elog(ERROR, "could not find injection point %s to wake up", name); } - inj_state->wait_counts[index]++; SpinLockRelease(&inj_state->lock); - /* And broadcast the change to the waiters */ - ConditionVariableBroadcast(&inj_state->wait_point); + pg_atomic_fetch_add_u32(&inj_state->wait_counts[index], 1); PG_RETURN_VOID(); } -- 2.54.0