From 589948c6e44b99133fc6ce46ee9de33a254f4d11 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 21 Mar 2026 12:43:16 +0200 Subject: [PATCH v7 16/18] Convert AIO to the new interface This replaces the "shmem_size" and "shmem_init" callbacks in the IO methods table with the same ShmemCallback struct that we now use in other subsystems --- src/backend/storage/aio/aio_init.c | 119 ++++++++++++++-------- src/backend/storage/aio/method_io_uring.c | 42 ++++---- src/backend/storage/aio/method_worker.c | 84 ++++++++------- src/backend/storage/ipc/ipci.c | 2 - src/include/storage/aio_internal.h | 16 +-- src/include/storage/aio_subsys.h | 4 - src/include/storage/subsystemlist.h | 3 + 7 files changed, 155 insertions(+), 115 deletions(-) diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c index d3c68d8b04c..54ab1238131 100644 --- a/src/backend/storage/aio/aio_init.c +++ b/src/backend/storage/aio/aio_init.c @@ -23,16 +23,46 @@ #include "storage/ipc.h" #include "storage/proc.h" #include "storage/shmem.h" +#include "storage/subsystems.h" #include "utils/guc.h" - -static Size -AioCtlShmemSize(void) -{ - /* pgaio_ctl itself */ - return sizeof(PgAioCtl); -} +static void AioShmemRequest(void *arg); +static void AioShmemInit(void *arg); +static void AioShmemAttach(void *arg); + +const ShmemCallbacks AioShmemCallbacks = { + .request_fn = AioShmemRequest, + .init_fn = AioShmemInit, + .attach_fn = AioShmemAttach, +}; + +static ShmemStructDesc AioCtlShmemDesc = { + .name = "AioCtl", + .size = sizeof(PgAioCtl), + .ptr = (void **) &pgaio_ctl, +}; + +static PgAioBackend *AioBackendShmemPtr; +static ShmemStructDesc AioBackendShmemDesc = { + .name = "AioBackend", + .ptr = (void **) &AioBackendShmemPtr, +}; +static PgAioHandle *AioHandleShmemPtr; +static ShmemStructDesc AioHandleShmemDesc = { + .name = "AioHandle", + .ptr = (void **) &AioHandleShmemPtr, +}; +static struct iovec *AioHandleIOVShmemPtr; +static ShmemStructDesc AioHandleIOVShmemDesc = { + .name = "AioHandleIOV", + .ptr = (void **) &AioHandleIOVShmemPtr, +}; +static uint64 *AioHandleDataShmemPtr; +static ShmemStructDesc AioHandleDataShmemDesc = { + .name = "AioHandleData", + .ptr = (void **) &AioHandleDataShmemPtr, +}; static uint32 AioProcs(void) @@ -109,10 +139,13 @@ AioChooseMaxConcurrency(void) return Min(max_proportional_pins, 64); } -Size -AioShmemSize(void) +/* + * Register shared memory area for AIO subsystem. + */ +static void +AioShmemRequest(void *arg) { - Size sz = 0; + /* Resolve io_max_concurrency if not already done. */ /* * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT. @@ -132,48 +165,41 @@ AioShmemSize(void) PGC_S_OVERRIDE); } - sz = add_size(sz, AioCtlShmemSize()); - sz = add_size(sz, AioBackendShmemSize()); - sz = add_size(sz, AioHandleShmemSize()); - sz = add_size(sz, AioHandleIOVShmemSize()); - sz = add_size(sz, AioHandleDataShmemSize()); + ShmemRequestStruct(&AioCtlShmemDesc); - /* Reserve space for method specific resources. */ - if (pgaio_method_ops->shmem_size) - sz = add_size(sz, pgaio_method_ops->shmem_size()); + AioBackendShmemDesc.size = AioBackendShmemSize(); + ShmemRequestStruct(&AioBackendShmemDesc); - return sz; + AioHandleShmemDesc.size = AioHandleShmemSize(); + ShmemRequestStruct(&AioHandleShmemDesc); + + AioHandleIOVShmemDesc.size = AioHandleIOVShmemSize(); + ShmemRequestStruct(&AioHandleIOVShmemDesc); + + AioHandleDataShmemDesc.size = AioHandleDataShmemSize(); + ShmemRequestStruct(&AioHandleDataShmemDesc); + + if (pgaio_method_ops->shmem_callbacks.request_fn) + pgaio_method_ops->shmem_callbacks.request_fn(pgaio_method_ops->shmem_callbacks.request_fn_arg); } -void -AioShmemInit(void) +/* + * Initialize AIO shared memory during postmaster startup. + */ +static void +AioShmemInit(void *arg) { - bool found; uint32 io_handle_off = 0; uint32 iovec_off = 0; uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit; - pgaio_ctl = (PgAioCtl *) - ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found); - - if (found) - goto out; - - memset(pgaio_ctl, 0, AioCtlShmemSize()); - pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency; pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs; - pgaio_ctl->backend_state = (PgAioBackend *) - ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found); - - pgaio_ctl->io_handles = (PgAioHandle *) - ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found); - - pgaio_ctl->iovecs = (struct iovec *) - ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found); - pgaio_ctl->handle_data = (uint64 *) - ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found); + pgaio_ctl->backend_state = AioBackendShmemPtr; + pgaio_ctl->io_handles = AioHandleShmemPtr; + pgaio_ctl->iovecs = AioHandleIOVShmemPtr; + pgaio_ctl->handle_data = AioHandleDataShmemPtr; for (int procno = 0; procno < AioProcs(); procno++) { @@ -208,10 +234,15 @@ AioShmemInit(void) } } -out: - /* Initialize IO method specific resources. */ - if (pgaio_method_ops->shmem_init) - pgaio_method_ops->shmem_init(!found); + if (pgaio_method_ops->shmem_callbacks.init_fn) + pgaio_method_ops->shmem_callbacks.init_fn(pgaio_method_ops->shmem_callbacks.init_fn_arg); +} + +static void +AioShmemAttach(void *arg) +{ + if (pgaio_method_ops->shmem_callbacks.attach_fn) + pgaio_method_ops->shmem_callbacks.attach_fn(pgaio_method_ops->shmem_callbacks.attach_fn_arg); } void diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index 4867ded35ea..df2d01d66fa 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -49,8 +49,8 @@ /* Entry points for IoMethodOps. */ -static size_t pgaio_uring_shmem_size(void); -static void pgaio_uring_shmem_init(bool first_time); +static void pgaio_uring_shmem_request(void *arg); +static void pgaio_uring_shmem_init(void *arg); static void pgaio_uring_init_backend(void); static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios); static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation); @@ -58,7 +58,6 @@ static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation); /* helper functions */ static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe); - const IoMethodOps pgaio_uring_ops = { /* * While io_uring mostly is OK with FDs getting closed while the IO is in @@ -69,8 +68,8 @@ const IoMethodOps pgaio_uring_ops = { */ .wait_on_fd_before_close = true, - .shmem_size = pgaio_uring_shmem_size, - .shmem_init = pgaio_uring_shmem_init, + .shmem_callbacks.request_fn = pgaio_uring_shmem_request, + .shmem_callbacks.init_fn = pgaio_uring_shmem_init, .init_backend = pgaio_uring_init_backend, .submit = pgaio_uring_submit, @@ -265,23 +264,34 @@ pgaio_uring_shmem_size(void) { size_t sz; + sz = pgaio_uring_context_shmem_size(); + sz = add_size(sz, pgaio_uring_ring_shmem_size()); + + return sz; +} + +static void +pgaio_uring_shmem_request(void *arg) +{ + static ShmemStructDesc AioUringShmemDesc = { + .name = "AioUringContext", + .ptr = (void **) &pgaio_uring_contexts, + }; + /* * Kernel and liburing support for various features influences how much * shmem we need, perform the necessary checks. */ pgaio_uring_check_capabilities(); - sz = pgaio_uring_context_shmem_size(); - sz = add_size(sz, pgaio_uring_ring_shmem_size()); - - return sz; + AioUringShmemDesc.size = pgaio_uring_shmem_size(); + ShmemRequestStruct(&AioUringShmemDesc); } static void -pgaio_uring_shmem_init(bool first_time) +pgaio_uring_shmem_init(void *arg) { int TotalProcs = pgaio_uring_procs(); - bool found; char *shmem; size_t ring_mem_remain = 0; char *ring_mem_next = 0; @@ -289,13 +299,11 @@ pgaio_uring_shmem_init(bool first_time) /* * We allocate memory for all PgAioUringContext instances and, if * supported, the memory required for each of the io_uring instances, in - * one ShmemInitStruct(). + * one combined allocation. + * + * pgaio_uring_contexts is already set to the base of the allocation. */ - shmem = ShmemInitStruct("AioUringContext", pgaio_uring_shmem_size(), &found); - if (found) - return; - - pgaio_uring_contexts = (PgAioUringContext *) shmem; + shmem = (char *) pgaio_uring_contexts; shmem += pgaio_uring_context_shmem_size(); /* if supported, handle memory alignment / sizing for io_uring memory */ diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index efe38e9f113..82c8b098a9e 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -41,6 +41,7 @@ #include "storage/ipc.h" #include "storage/latch.h" #include "storage/proc.h" +#include "storage/shmem.h" #include "tcop/tcopprot.h" #include "utils/injection_point.h" #include "utils/memdebug.h" @@ -73,16 +74,20 @@ typedef struct PgAioWorkerControl } PgAioWorkerControl; -static size_t pgaio_worker_shmem_size(void); -static void pgaio_worker_shmem_init(bool first_time); +static void pgaio_worker_shmem_request(void *arg); +static void pgaio_worker_shmem_init(void *arg); +static void pgaio_worker_shmem_attach(void *arg); + +static PgAioWorkerSubmissionQueue *io_worker_submission_queue; static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh); static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios); const IoMethodOps pgaio_worker_ops = { - .shmem_size = pgaio_worker_shmem_size, - .shmem_init = pgaio_worker_shmem_init, + .shmem_callbacks.request_fn = pgaio_worker_shmem_request, + .shmem_callbacks.init_fn = pgaio_worker_shmem_init, + .shmem_callbacks.attach_fn = pgaio_worker_shmem_attach, .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution, .submit = pgaio_worker_submit, @@ -95,7 +100,6 @@ int io_workers = 3; static int io_worker_queue_size = 64; static int MyIoWorkerId; -static PgAioWorkerSubmissionQueue *io_worker_submission_queue; static PgAioWorkerControl *io_worker_control; @@ -116,50 +120,60 @@ pgaio_worker_control_shmem_size(void) sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS; } -static size_t -pgaio_worker_shmem_size(void) +/* + * Set secondary AIO worker pointer from the combined allocation. + */ +static void +pgaio_worker_set_secondary_ptr(void) { - size_t sz; int queue_size; + Size queue_sz = pgaio_worker_queue_shmem_size(&queue_size); - sz = pgaio_worker_queue_shmem_size(&queue_size); - sz = add_size(sz, pgaio_worker_control_shmem_size()); - - return sz; + io_worker_control = (PgAioWorkerControl *) + ((char *) io_worker_submission_queue + MAXALIGN(queue_sz)); } static void -pgaio_worker_shmem_init(bool first_time) +pgaio_worker_shmem_init(void *arg) { - bool found; int queue_size; - io_worker_submission_queue = - ShmemInitStruct("AioWorkerSubmissionQueue", - pgaio_worker_queue_shmem_size(&queue_size), - &found); - if (!found) - { - io_worker_submission_queue->size = queue_size; - io_worker_submission_queue->head = 0; - io_worker_submission_queue->tail = 0; - } + pgaio_worker_queue_shmem_size(&queue_size); + io_worker_submission_queue->size = queue_size; + io_worker_submission_queue->head = 0; + io_worker_submission_queue->tail = 0; + + pgaio_worker_set_secondary_ptr(); - io_worker_control = - ShmemInitStruct("AioWorkerControl", - pgaio_worker_control_shmem_size(), - &found); - if (!found) + io_worker_control->idle_worker_mask = 0; + for (int i = 0; i < MAX_IO_WORKERS; ++i) { - io_worker_control->idle_worker_mask = 0; - for (int i = 0; i < MAX_IO_WORKERS; ++i) - { - io_worker_control->workers[i].latch = NULL; - io_worker_control->workers[i].in_use = false; - } + io_worker_control->workers[i].latch = NULL; + io_worker_control->workers[i].in_use = false; } } +static void +pgaio_worker_shmem_attach(void *arg) +{ + pgaio_worker_set_secondary_ptr(); +} + +static void +pgaio_worker_shmem_request(void *arg) +{ + static ShmemStructDesc AioWorkerShmemDesc = { + .name = "AioWorkerSubmissionQueue", + .ptr = (void **) &io_worker_submission_queue, + }; + int queue_size; + + AioWorkerShmemDesc.size = + MAXALIGN(pgaio_worker_queue_shmem_size(&queue_size)) + + pgaio_worker_control_shmem_size(); + ShmemRequestStruct(&AioWorkerShmemDesc); +} + static int pgaio_worker_choose_idle(void) { diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 49a946d3fae..b5f3df68963 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -118,7 +118,6 @@ CalculateShmemSize(void) size = add_size(size, SyncScanShmemSize()); size = add_size(size, StatsShmemSize()); size = add_size(size, SlotSyncShmemSize()); - size = add_size(size, AioShmemSize()); size = add_size(size, WaitLSNShmemSize()); size = add_size(size, LogicalDecodingCtlShmemSize()); @@ -302,7 +301,6 @@ CreateOrAttachShmemStructs(void) BTreeShmemInit(); SyncScanShmemInit(); StatsShmemInit(); - AioShmemInit(); WaitLSNShmemInit(); LogicalDecodingCtlShmemInit(); } diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h index 5feea15be9e..9dd8d63b25c 100644 --- a/src/include/storage/aio_internal.h +++ b/src/include/storage/aio_internal.h @@ -20,6 +20,8 @@ #include "port/pg_iovec.h" #include "storage/aio.h" #include "storage/condition_variable.h" +#include "storage/ipc.h" +#include "storage/shmem.h" /* @@ -267,20 +269,8 @@ typedef struct IoMethodOps */ bool wait_on_fd_before_close; - /* global initialization */ - - /* - * Amount of additional shared memory to reserve for the io_method. Called - * just like a normal ipci.c style *Size() function. Optional. - */ - size_t (*shmem_size) (void); - - /* - * Initialize shared memory. First time is true if AIO's shared memory was - * just initialized, false otherwise. Optional. - */ - void (*shmem_init) (bool first_time); + ShmemCallbacks shmem_callbacks; /* * Per-backend initialization. Optional. diff --git a/src/include/storage/aio_subsys.h b/src/include/storage/aio_subsys.h index 276cb3e31c4..dd54869351f 100644 --- a/src/include/storage/aio_subsys.h +++ b/src/include/storage/aio_subsys.h @@ -20,12 +20,8 @@ /* aio_init.c */ -extern Size AioShmemSize(void); -extern void AioShmemInit(void); - extern void pgaio_init_backend(void); - /* aio.c */ extern void pgaio_error_cleanup(void); extern void AtEOXact_Aio(bool is_commit); diff --git a/src/include/storage/subsystemlist.h b/src/include/storage/subsystemlist.h index 63d1d60ae36..e8e06be30c2 100644 --- a/src/include/storage/subsystemlist.h +++ b/src/include/storage/subsystemlist.h @@ -48,3 +48,6 @@ PG_SHMEM_SUBSYSTEM(ProcSignalShmemCallbacks) PG_SHMEM_SUBSYSTEM(AsyncShmemCallbacks) PG_SHMEM_SUBSYSTEM(WaitEventCustomShmemCallbacks) PG_SHMEM_SUBSYSTEM(InjectionPointShmemCallbacks) + +/* AIO subsystem. This delegates to the method-specific callbacks */ +PG_SHMEM_SUBSYSTEM(AioShmemCallbacks) -- 2.47.3