From d510b46530bd219c9767e42e02f093d3460babef Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 10 Sep 2025 14:00:02 -0400 Subject: [PATCH v6 2/8] test_aio: Add read_stream test infrastructure & tests Author: Andres Freund Reviewed-by: Nazir Bilal Yavuz Reviewed-by: Melanie Plageman Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/test/modules/test_aio/meson.build | 1 + .../modules/test_aio/t/004_read_stream.pl | 286 +++++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 24 +- src/test/modules/test_aio/test_aio.c | 346 +++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 5 files changed, 607 insertions(+), 51 deletions(-) create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 18a797f3a3b..909f81d96c1 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -33,6 +33,7 @@ tests += { 't/001_aio.pl', 't/002_io_workers.pl', 't/003_initdb.pl', + 't/004_read_stream.pl', ], }, } diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl new file mode 100644 index 00000000000..17a68e35c1d --- /dev/null +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -0,0 +1,286 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(); + +$node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +max_connections=8 +io_method=worker +)); + +$node->start(); +test_setup($node); +$node->stop(); + + +foreach my $method (TestAio::supported_io_methods()) +{ + $node->adjust_conf('postgresql.conf', 'io_method', $method); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} + +done_testing(); + + +sub test_setup +{ + my $node = shift; + + $node->safe_psql( + 'postgres', qq( +CREATE EXTENSION test_aio; + +CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10); +INSERT INTO largeish(k) SELECT generate_series(1, 10000); +)); + ok(1, "setup"); +} + + +sub test_repeated_blocks +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + # Preventing larger reads makes testing easier + $psql->query_safe( + qq/ SET io_combine_limit = 1; /); + + # test miss of the same block twice in a row + $psql->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + # block 0 grows the distance enough that the stream will look ahead and try + # to start a pending read for block 2 (and later block 4) twice before + # returning any buffers. + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]); /); + + ok(1, "$io_method: stream missing the same block repeatedly"); + + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]); /); + ok(1, "$io_method: stream hitting the same block repeatedly"); + + # test hit of the same block twice in a row + $psql->query_safe( + qq/ SELECT evict_rel('largeish'); /); + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]); /); + ok(1, "$io_method: stream accessing same block"); + + # Test repeated blocks with a temp table, using invalidate_rel_block() + # to evict individual local buffers. + $psql->query_safe( + qq/ CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10); + INSERT INTO largeish_temp(k) SELECT generate_series(1, 200); /); + + # Evict the specific blocks we'll request to force misses + $psql->query_safe( + qq/ SELECT invalidate_rel_block('largeish_temp', 0); /); + $psql->query_safe( + qq/ SELECT invalidate_rel_block('largeish_temp', 2); /); + $psql->query_safe( + qq/ SELECT invalidate_rel_block('largeish_temp', 4); /); + + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]); /); + ok(1, "$io_method: temp stream missing the same block repeatedly"); + + # Now the blocks are cached, so repeated access should be hits + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]); /); + ok(1, "$io_method: temp stream hitting the same block repeatedly"); + + $psql->quit(); +} + + +sub test_inject_foreign +{ + my $io_method = shift; + my $node = shift; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads succeeding. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + # Block 5 is undergoing IO in session b, so session a will move on to start + # a new IO for block 7. + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + ok(1, qq/$io_method: read stream encounters succeeding IO by another backend/); + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads failing. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_short_read_attach(-errno_from_string('EIO'), + pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + pump_until( + $psql_b->{run}, $psql_b->{timeout}, + \$psql_b->{stderr}, qr/ERROR.*could not read blocks 5\.\.5/); + ok(1, "$io_method: injected error occurred"); + $psql_b->{stderr} = ''; + $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); + + ok(1, + qq/$io_method: read stream encounters failing IO by another backend/); + + + ### + # Test read stream encountering two buffers that are undergoing the same + # IO, started by another backend. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>2, nblocks=>3);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + # Blocks 2 and 4 are undergoing IO initiated by session a + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', + qq/ SELECT inj_io_completion_continue() /); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,4\}/); + + ok(1, qq/$io_method: read stream encounters two buffer read in one IO/); + + $psql_a->quit(); + $psql_b->quit(); +} + + +sub test_io_method +{ + my $io_method = shift; + my $node = shift; + + is($node->safe_psql('postgres', 'SHOW io_method'), + $io_method, "$io_method: io_method set correctly"); + + test_repeated_blocks($io_method, $node); + + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + test_inject_foreign($io_method, $node); + } +} diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..1cc4734a746 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll( RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION evict_rel(rel regclass) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +/* + * Read stream related functions + */ +CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + + /* * Handle related functions @@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Injection point related functions */ -CREATE FUNCTION inj_io_short_read_attach(result int) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_completion_continue() +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_short_read_detach() diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index b1aa8af9ec0..061f0c9f92a 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -19,17 +19,26 @@ #include "postgres.h" #include "access/relation.h" +#include "catalog/pg_type.h" #include "fmgr.h" +#include "funcapi.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/checksum.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procnumber.h" +#include "storage/read_stream.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" +#include "utils/tuplestore.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -37,13 +46,30 @@ PG_MODULE_MAGIC; typedef struct InjIoErrorState { + ConditionVariable cv; + bool enabled_short_read; bool enabled_reopen; + bool enabled_completion_wait; + Oid completion_wait_relfilenode; + pid_t completion_wait_pid; + uint32 completion_wait_event; + bool short_read_result_set; + Oid short_read_relfilenode; + pid_t short_read_pid; int short_read_result; } InjIoErrorState; +typedef struct BlocksReadStreamData +{ + int nblocks; + int curblock; + uint32 *blocks; +} BlocksReadStreamData; + + static InjIoErrorState *inj_io_error_state; /* Shared memory init callbacks */ @@ -85,10 +111,13 @@ test_aio_shmem_startup(void) inj_io_error_state->enabled_short_read = false; inj_io_error_state->enabled_reopen = false; + ConditionVariableInit(&inj_io_error_state->cv); + inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait"); + #ifdef USE_INJECTION_POINTS InjectionPointAttach("aio-process-completion-before-shared", "test_aio", - "inj_io_short_read", + "inj_io_completion_hook", NULL, 0); InjectionPointLoad("aio-process-completion-before-shared"); @@ -384,7 +413,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (nblocks <= 0 || nblocks > PG_IOV_MAX) elog(ERROR, "nblocks is out of range"); - rel = relation_open(relid, AccessExclusiveLock); + rel = relation_open(relid, AccessShareLock); for (int i = 0; i < nblocks; i++) { @@ -458,6 +487,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(evict_rel); +Datum +evict_rel(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation rel; + int32 buffers_evicted, + buffers_flushed, + buffers_skipped; + + rel = relation_open(relid, AccessExclusiveLock); + + EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed, + &buffers_skipped); + + relation_close(rel, AccessExclusiveLock); + + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(invalidate_rel_block); Datum invalidate_rel_block(PG_FUNCTION_ARGS) @@ -610,6 +660,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +static BlockNumber +read_stream_for_blocks_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + BlocksReadStreamData *stream_data = callback_private_data; + + if (stream_data->curblock >= stream_data->nblocks) + return InvalidBlockNumber; + return stream_data->blocks[stream_data->curblock++]; +} + +PG_FUNCTION_INFO_V1(read_stream_for_blocks); +Datum +read_stream_for_blocks(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + BlocksReadStreamData stream_data; + ReadStream *stream; + + InitMaterializedSRF(fcinfo, 0); + + /* + * We expect the input to be an N-element int4 array; verify that. We + * don't need to use deconstruct_array() since the array data is just + * going to look like a C array of N int4 values. + */ + if (ARR_NDIM(blocksarray) != 1 || + ARR_HASNULL(blocksarray) || + ARR_ELEMTYPE(blocksarray) != INT4OID) + elog(ERROR, "expected 1 dimensional int4 array"); + + stream_data.curblock = 0; + stream_data.nblocks = ARR_DIMS(blocksarray)[0]; + stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray); + + rel = relation_open(relid, AccessShareLock); + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + MAIN_FORKNUM, + read_stream_for_blocks_cb, + &stream_data, + 0); + + for (int i = 0; i < stream_data.nblocks; i++) + { + Buffer buf = read_stream_next_buffer(stream, NULL); + Datum values[3] = {0}; + bool nulls[3] = {0}; + + if (!BufferIsValid(buf)) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i); + + values[0] = Int32GetDatum(i); + values[1] = UInt32GetDatum(stream_data.blocks[i]); + values[2] = UInt32GetDatum(buf); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + ReleaseBuffer(buf); + } + + if (read_stream_next_buffer(stream, NULL) != InvalidBuffer) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid", + stream_data.nblocks + 1); + + read_stream_end(stream); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) @@ -680,15 +810,98 @@ batch_end(PG_FUNCTION_ARGS) } #ifdef USE_INJECTION_POINTS -extern PGDLLEXPORT void inj_io_short_read(const char *name, - const void *private_data, - void *arg); +extern PGDLLEXPORT void inj_io_completion_hook(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data, void *arg); -void -inj_io_short_read(const char *name, const void *private_data, void *arg) +static bool +inj_io_short_read_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_short_read) + return false; + + if (!inj_io_error_state->short_read_result_set) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->short_read_pid != 0 && + inj_io_error_state->short_read_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->short_read_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode) + return false; + + /* + * Only shorten reads that are actually longer than the target size, + * otherwise we can trigger over-reads. + */ + if (inj_io_error_state->short_read_result >= ioh->result) + return false; + + return true; +} + +static bool +inj_io_completion_wait_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_completion_wait) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->completion_wait_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->completion_wait_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode) + return false; + + return true; +} + +static void +inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg) +{ + PgAioHandle *ioh = (PgAioHandle *) arg; + + if (!inj_io_completion_wait_matches(ioh)) + return; + + ConditionVariablePrepareToSleep(&inj_io_error_state->cv); + + while (true) + { + if (!inj_io_completion_wait_matches(ioh)) + break; + + ConditionVariableSleep(&inj_io_error_state->cv, + inj_io_error_state->completion_wait_event); + } + + ConditionVariableCancelSleep(); +} + +static void +inj_io_short_read_hook(const char *name, const void *private_data, void *arg) { PgAioHandle *ioh = (PgAioHandle *) arg; @@ -697,58 +910,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg) inj_io_error_state->enabled_reopen), errhidestmt(true), errhidecontext(true)); - if (inj_io_error_state->enabled_short_read) + if (inj_io_short_read_matches(ioh)) { + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + int32 old_result = ioh->result; + int32 new_result = inj_io_error_state->short_read_result; + int32 processed = 0; + + ereport(LOG, + errmsg("short read inject point, changing result from %d to %d", + old_result, new_result), + errhidestmt(true), errhidecontext(true)); + /* - * Only shorten reads that are actually longer than the target size, - * otherwise we can trigger over-reads. + * The underlying IO actually completed OK, and thus the "invalid" + * portion of the IOV actually contains valid data. That can hide a + * lot of problems, e.g. if we were to wrongly mark a buffer, that + * wasn't read according to the shortened-read, IO as valid, the + * contents would look valid and we might miss a bug. + * + * To avoid that, iterate through the IOV and zero out the "failed" + * portion of the IO. */ - if (inj_io_error_state->short_read_result_set - && ioh->op == PGAIO_OP_READV - && inj_io_error_state->short_read_result <= ioh->result) + for (int i = 0; i < ioh->op_data.read.iov_length; i++) { - struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; - int32 old_result = ioh->result; - int32 new_result = inj_io_error_state->short_read_result; - int32 processed = 0; - - ereport(LOG, - errmsg("short read inject point, changing result from %d to %d", - old_result, new_result), - errhidestmt(true), errhidecontext(true)); - - /* - * The underlying IO actually completed OK, and thus the "invalid" - * portion of the IOV actually contains valid data. That can hide - * a lot of problems, e.g. if we were to wrongly mark a buffer, - * that wasn't read according to the shortened-read, IO as valid, - * the contents would look valid and we might miss a bug. - * - * To avoid that, iterate through the IOV and zero out the - * "failed" portion of the IO. - */ - for (int i = 0; i < ioh->op_data.read.iov_length; i++) + if (processed + iov[i].iov_len <= new_result) + processed += iov[i].iov_len; + else if (processed <= new_result) { - if (processed + iov[i].iov_len <= new_result) - processed += iov[i].iov_len; - else if (processed <= new_result) - { - uint32 ok_part = new_result - processed; - - memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); - processed += iov[i].iov_len; - } - else - { - memset((char *) iov[i].iov_base, 0, iov[i].iov_len); - } - } + uint32 ok_part = new_result - processed; - ioh->result = new_result; + memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); + processed += iov[i].iov_len; + } + else + { + memset((char *) iov[i].iov_base, 0, iov[i].iov_len); + } } + + ioh->result = new_result; } } +void +inj_io_completion_hook(const char *name, const void *private_data, void *arg) +{ + inj_io_completion_wait_hook(name, private_data, arg); + inj_io_short_read_hook(name, private_data, arg); +} + void inj_io_reopen(const char *name, const void *private_data, void *arg) { @@ -762,6 +973,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg) } #endif +PG_FUNCTION_INFO_V1(inj_io_completion_wait); +Datum +inj_io_completion_wait(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = true; + inj_io_error_state->completion_wait_pid = + PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0); + inj_io_error_state->completion_wait_relfilenode = + PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_io_completion_continue); +Datum +inj_io_completion_continue(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = false; + inj_io_error_state->completion_wait_pid = 0; + inj_io_error_state->completion_wait_relfilenode = InvalidOid; + ConditionVariableBroadcast(&inj_io_error_state->cv); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(inj_io_short_read_attach); Datum inj_io_short_read_attach(PG_FUNCTION_ARGS) @@ -771,6 +1015,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS) inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0); if (inj_io_error_state->short_read_result_set) inj_io_error_state->short_read_result = PG_GETARG_INT32(0); + inj_io_error_state->short_read_pid = + PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1); + inj_io_error_state->short_read_relfilenode = + PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2); #else elog(ERROR, "injection points not supported"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 174e2798443..340662cf72c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -305,6 +305,7 @@ BlockSampler BlockSamplerData BlockedProcData BlockedProcsData +BlocksReadStreamData BlocktableEntry BloomBuildState BloomFilter -- 2.43.0