From 7031072c40494b151d9849160d403f719c60631c Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 27 Mar 2026 13:08:26 -0400 Subject: [PATCH v9 2/4] test_aio: Add read_stream test infrastructure & tests While we have a lot of indirect coverage of read streams, there are corner cases that are hard to test when only indirectly controlling and observing the read stream. This commit adds an SQL callable SRF interface for a read stream and uses that in a few tests. To make some of the tests possible, the injection point infrastructure in test_aio had to be expanded to allow blocking IO completion. Author: Andres Freund Reviewed-by: Nazir Bilal Yavuz Reviewed-by: Melanie Plageman Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/test/modules/test_aio/meson.build | 1 + .../modules/test_aio/t/004_read_stream.pl | 275 ++++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 19 +- src/test/modules/test_aio/test_aio.c | 336 +++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 5 files changed, 582 insertions(+), 50 deletions(-) create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 18a797f3a3b..909f81d96c1 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -33,6 +33,7 @@ tests += { 't/001_aio.pl', 't/002_io_workers.pl', 't/003_initdb.pl', + 't/004_read_stream.pl', ], }, } diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl new file mode 100644 index 00000000000..0d123ac0ed5 --- /dev/null +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -0,0 +1,275 @@ +# Copyright (c) 2025-2026, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(); + +TestAio::configure($node); + +$node->append_conf( + 'postgresql.conf', qq( +max_connections=8 +io_method=worker +)); + +$node->start(); +test_setup($node); +$node->stop(); + + +foreach my $method (TestAio::supported_io_methods()) +{ + $node->adjust_conf('postgresql.conf', 'io_method', $method); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} + +done_testing(); + + +sub test_setup +{ + my $node = shift; + + $node->safe_psql( + 'postgres', qq( +CREATE EXTENSION test_aio; + +CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10); +INSERT INTO largeish(k) SELECT generate_series(1, 10000); +)); + ok(1, "setup"); +} + + +sub test_repeated_blocks +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + # Preventing larger reads makes testing easier + $psql->query_safe(qq/SET io_combine_limit = 1/); + + # test miss of the same block twice in a row + $psql->query_safe(qq/SELECT evict_rel('largeish');/); + + # block 0 grows the distance enough that the stream will look ahead and try + # to start a pending read for block 2 (and later block 4) twice before + # returning any buffers. + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]);/); + + ok(1, "$io_method: stream missing the same block repeatedly"); + + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]);/); + ok(1, "$io_method: stream hitting the same block repeatedly"); + + # test hit of the same block twice in a row + $psql->query_safe(qq/SELECT evict_rel('largeish');/); + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);/); + ok(1, "$io_method: stream accessing same block"); + + # Test repeated blocks with a temp table, using invalidate_rel_block() + # to evict individual local buffers. + $psql->query_safe( + qq/CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10); + INSERT INTO largeish_temp(k) SELECT generate_series(1, 200);/); + + # Evict the specific blocks we'll request to force misses + $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 0);/); + $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 2);/); + $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 4);/); + + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]);/); + ok(1, "$io_method: temp stream missing the same block repeatedly"); + + # Now the blocks are cached, so repeated access should be hits + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]);/); + ok(1, "$io_method: temp stream hitting the same block repeatedly"); + + $psql->quit(); +} + + +sub test_inject_foreign +{ + my $io_method = shift; + my $node = shift; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads succeeding. + ### + $psql_a->query_safe(qq/SELECT evict_rel('largeish');/); + + $psql_b->query_safe( + qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait';/, + 'completion_wait'); + + # Block 5 is undergoing IO in session b, so session a will move on to start + # a new IO for block 7. + $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + ok(1, + qq/$io_method: read stream encounters succeeding IO by another backend/ + ); + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads failing. + ### + $psql_a->query_safe(qq/SELECT evict_rel('largeish');/); + + $psql_b->query_safe( + qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->query_safe( + qq/SELECT inj_io_short_read_attach(-errno_from_string('EIO'), + pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', + qq/SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait';/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + pump_until($psql_b->{run}, $psql_b->{timeout}, \$psql_b->{stderr}, + qr/ERROR.*could not read blocks 5\.\.5/); + ok(1, "$io_method: injected error occurred"); + $psql_b->{stderr} = ''; + $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); + + ok(1, + qq/$io_method: read stream encounters failing IO by another backend/); + + + ### + # Test read stream encountering two buffers that are undergoing the same + # IO, started by another backend. + ### + $psql_a->query_safe(qq/SELECT evict_rel('largeish');/); + + $psql_b->query_safe( + qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish', + blockno=>2, nblocks=>3);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', + qq/SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait';/, + 'completion_wait'); + + # Blocks 2 and 4 are undergoing IO initiated by session b + $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,4\}/); + + ok(1, qq/$io_method: read stream encounters two buffer read in one IO/); + + $psql_a->quit(); + $psql_b->quit(); +} + + +sub test_io_method +{ + my $io_method = shift; + my $node = shift; + + is($node->safe_psql('postgres', 'SHOW io_method'), + $io_method, "$io_method: io_method set correctly"); + + test_repeated_blocks($io_method, $node); + + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + test_inject_foreign($io_method, $node); + } +} diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index 86beb563b6a..4a5a379b3c5 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -57,6 +57,13 @@ CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT bl RETURNS SETOF record STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +/* + * Read stream related functions + */ +CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + /* * Handle related functions @@ -98,8 +105,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Injection point related functions */ -CREATE FUNCTION inj_io_short_read_attach(result int) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT NULL, blockno int4 DEFAULT NULL) +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_completion_continue() +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT NULL) +RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_short_read_detach() diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index 3e486a5806e..cb614551914 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -27,13 +27,18 @@ #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/checksum.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procnumber.h" +#include "storage/read_stream.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" #include "utils/tuplestore.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -41,13 +46,31 @@ PG_MODULE_MAGIC; typedef struct InjIoErrorState { + ConditionVariable cv; + bool enabled_short_read; bool enabled_reopen; + bool enabled_completion_wait; + Oid completion_wait_relfilenode; + BlockNumber completion_wait_blockno; + pid_t completion_wait_pid; + uint32 completion_wait_event; + bool short_read_result_set; + Oid short_read_relfilenode; + pid_t short_read_pid; int short_read_result; } InjIoErrorState; +typedef struct BlocksReadStreamData +{ + int nblocks; + int curblock; + uint32 *blocks; +} BlocksReadStreamData; + + static InjIoErrorState *inj_io_error_state; /* Shared memory init callbacks */ @@ -88,11 +111,15 @@ test_aio_shmem_startup(void) /* First time through, initialize */ inj_io_error_state->enabled_short_read = false; inj_io_error_state->enabled_reopen = false; + inj_io_error_state->enabled_completion_wait = false; + + ConditionVariableInit(&inj_io_error_state->cv); + inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait"); #ifdef USE_INJECTION_POINTS InjectionPointAttach("aio-process-completion-before-shared", "test_aio", - "inj_io_short_read", + "inj_io_completion_hook", NULL, 0); InjectionPointLoad("aio-process-completion-before-shared"); @@ -388,7 +415,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (nblocks <= 0 || nblocks > PG_IOV_MAX) elog(ERROR, "nblocks is out of range"); - rel = relation_open(relid, AccessExclusiveLock); + rel = relation_open(relid, AccessShareLock); for (int i = 0; i < nblocks; i++) { @@ -819,6 +846,85 @@ read_buffers(PG_FUNCTION_ARGS) } +static BlockNumber +read_stream_for_blocks_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + BlocksReadStreamData *stream_data = callback_private_data; + + if (stream_data->curblock >= stream_data->nblocks) + return InvalidBlockNumber; + return stream_data->blocks[stream_data->curblock++]; +} + +PG_FUNCTION_INFO_V1(read_stream_for_blocks); +Datum +read_stream_for_blocks(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + BlocksReadStreamData stream_data; + ReadStream *stream; + + InitMaterializedSRF(fcinfo, 0); + + /* + * We expect the input to be an N-element int4 array; verify that. We + * don't need to use deconstruct_array() since the array data is just + * going to look like a C array of N int4 values. + */ + if (ARR_NDIM(blocksarray) != 1 || + ARR_HASNULL(blocksarray) || + ARR_ELEMTYPE(blocksarray) != INT4OID) + elog(ERROR, "expected 1 dimensional int4 array"); + + stream_data.curblock = 0; + stream_data.nblocks = ARR_DIMS(blocksarray)[0]; + stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray); + + rel = relation_open(relid, AccessShareLock); + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + MAIN_FORKNUM, + read_stream_for_blocks_cb, + &stream_data, + 0); + + for (int i = 0; i < stream_data.nblocks; i++) + { + Buffer buf = read_stream_next_buffer(stream, NULL); + Datum values[3] = {0}; + bool nulls[3] = {0}; + + if (!BufferIsValid(buf)) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i); + + values[0] = Int32GetDatum(i); + values[1] = UInt32GetDatum(stream_data.blocks[i]); + values[2] = UInt32GetDatum(buf); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + ReleaseBuffer(buf); + } + + if (read_stream_next_buffer(stream, NULL) != InvalidBuffer) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid", + stream_data.nblocks); + + read_stream_end(stream); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) @@ -889,15 +995,111 @@ batch_end(PG_FUNCTION_ARGS) } #ifdef USE_INJECTION_POINTS -extern PGDLLEXPORT void inj_io_short_read(const char *name, - const void *private_data, - void *arg); +extern PGDLLEXPORT void inj_io_completion_hook(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data, void *arg); -void -inj_io_short_read(const char *name, const void *private_data, void *arg) +static bool +inj_io_short_read_matches(PgAioHandle *ioh) +{ + PGPROC *io_proc; + int32 io_pid; + int32 inj_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_short_read) + return false; + + if (!inj_io_error_state->short_read_result_set) + return false; + + io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + io_pid = io_proc->pid; + inj_pid = inj_io_error_state->short_read_pid; + + if (inj_pid != InvalidPid && inj_pid != io_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->short_read_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode) + return false; + + /* + * Only shorten reads that are actually longer than the target size, + * otherwise we can trigger over-reads. + */ + if (inj_io_error_state->short_read_result >= ioh->result) + return false; + + return true; +} + +static bool +inj_io_completion_wait_matches(PgAioHandle *ioh) +{ + PGPROC *io_proc; + int32 io_pid; + PgAioTargetData *td; + int32 inj_pid; + BlockNumber io_blockno; + BlockNumber inj_blockno; + Oid inj_relfilenode; + + if (!inj_io_error_state->enabled_completion_wait) + return false; + + io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + io_pid = io_proc->pid; + inj_pid = inj_io_error_state->completion_wait_pid; + + if (inj_pid != InvalidPid && inj_pid != io_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + inj_relfilenode = inj_io_error_state->completion_wait_relfilenode; + if (inj_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_relfilenode) + return false; + + inj_blockno = inj_io_error_state->completion_wait_blockno; + io_blockno = td->smgr.blockNum; + if (inj_blockno != InvalidBlockNumber && + !(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks))) + return false; + + return true; +} + +static void +inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg) +{ + PgAioHandle *ioh = (PgAioHandle *) arg; + + if (!inj_io_completion_wait_matches(ioh)) + return; + + ConditionVariablePrepareToSleep(&inj_io_error_state->cv); + + while (true) + { + if (!inj_io_completion_wait_matches(ioh)) + break; + + ConditionVariableSleep(&inj_io_error_state->cv, + inj_io_error_state->completion_wait_event); + } + + ConditionVariableCancelSleep(); +} + +static void +inj_io_short_read_hook(const char *name, const void *private_data, void *arg) { PgAioHandle *ioh = (PgAioHandle *) arg; @@ -906,58 +1108,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg) inj_io_error_state->enabled_reopen), errhidestmt(true), errhidecontext(true)); - if (inj_io_error_state->enabled_short_read) + if (inj_io_short_read_matches(ioh)) { + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + int32 old_result = ioh->result; + int32 new_result = inj_io_error_state->short_read_result; + int32 processed = 0; + + ereport(LOG, + errmsg("short read inject point, changing result from %d to %d", + old_result, new_result), + errhidestmt(true), errhidecontext(true)); + /* - * Only shorten reads that are actually longer than the target size, - * otherwise we can trigger over-reads. + * The underlying IO actually completed OK, and thus the "invalid" + * portion of the IOV actually contains valid data. That can hide a + * lot of problems, e.g. if we were to wrongly mark a buffer, that + * wasn't read according to the shortened-read, IO as valid, the + * contents would look valid and we might miss a bug. + * + * To avoid that, iterate through the IOV and zero out the "failed" + * portion of the IO. */ - if (inj_io_error_state->short_read_result_set - && ioh->op == PGAIO_OP_READV - && inj_io_error_state->short_read_result <= ioh->result) + for (int i = 0; i < ioh->op_data.read.iov_length; i++) { - struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; - int32 old_result = ioh->result; - int32 new_result = inj_io_error_state->short_read_result; - int32 processed = 0; - - ereport(LOG, - errmsg("short read inject point, changing result from %d to %d", - old_result, new_result), - errhidestmt(true), errhidecontext(true)); - - /* - * The underlying IO actually completed OK, and thus the "invalid" - * portion of the IOV actually contains valid data. That can hide - * a lot of problems, e.g. if we were to wrongly mark a buffer, - * that wasn't read according to the shortened-read, IO as valid, - * the contents would look valid and we might miss a bug. - * - * To avoid that, iterate through the IOV and zero out the - * "failed" portion of the IO. - */ - for (int i = 0; i < ioh->op_data.read.iov_length; i++) + if (processed + iov[i].iov_len <= new_result) + processed += iov[i].iov_len; + else if (processed <= new_result) { - if (processed + iov[i].iov_len <= new_result) - processed += iov[i].iov_len; - else if (processed <= new_result) - { - uint32 ok_part = new_result - processed; + uint32 ok_part = new_result - processed; - memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); - processed += iov[i].iov_len; - } - else - { - memset((char *) iov[i].iov_base, 0, iov[i].iov_len); - } + memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); + processed += iov[i].iov_len; + } + else + { + memset((char *) iov[i].iov_base, 0, iov[i].iov_len); } - - ioh->result = new_result; } + + ioh->result = new_result; } } +void +inj_io_completion_hook(const char *name, const void *private_data, void *arg) +{ + inj_io_completion_wait_hook(name, private_data, arg); + inj_io_short_read_hook(name, private_data, arg); +} + void inj_io_reopen(const char *name, const void *private_data, void *arg) { @@ -971,6 +1171,42 @@ inj_io_reopen(const char *name, const void *private_data, void *arg) } #endif +PG_FUNCTION_INFO_V1(inj_io_completion_wait); +Datum +inj_io_completion_wait(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = true; + inj_io_error_state->completion_wait_pid = + PG_ARGISNULL(0) ? InvalidPid : PG_GETARG_INT32(0); + inj_io_error_state->completion_wait_relfilenode = + PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); + inj_io_error_state->completion_wait_blockno = + PG_ARGISNULL(2) ? InvalidBlockNumber : PG_GETARG_UINT32(2); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_io_completion_continue); +Datum +inj_io_completion_continue(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = false; + inj_io_error_state->completion_wait_pid = InvalidPid; + inj_io_error_state->completion_wait_relfilenode = InvalidOid; + inj_io_error_state->completion_wait_blockno = InvalidBlockNumber; + ConditionVariableBroadcast(&inj_io_error_state->cv); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(inj_io_short_read_attach); Datum inj_io_short_read_attach(PG_FUNCTION_ARGS) @@ -980,6 +1216,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS) inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0); if (inj_io_error_state->short_read_result_set) inj_io_error_state->short_read_result = PG_GETARG_INT32(0); + inj_io_error_state->short_read_pid = + PG_ARGISNULL(1) ? InvalidPid : PG_GETARG_INT32(1); + inj_io_error_state->short_read_relfilenode = + PG_ARGISNULL(2) ? InvalidOid : PG_GETARG_OID(2); #else elog(ERROR, "injection points not supported"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 112653c1680..b2c7c9e6f7c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -309,6 +309,7 @@ BlockSampler BlockSamplerData BlockedProcData BlockedProcsData +BlocksReadStreamData BlocktableEntry BloomBuildState BloomFilter -- 2.53.0.1.gb2826b52eb