From 744f6b4b1d317b77ccadae0eefe8a08065f48d4c Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin Date: Mon, 16 Jun 2025 18:41:34 +0300 Subject: [PATCH v7 4/4] amcheck: brin_index_check() - heap all indexed This commit extends functionality of brin_index_check() with 'heap all indexed' check: we validate every index range tuple against every heap tuple within the range using withinRange function. Also, we check that fields 'has_nulls', 'all_nulls' and 'empty_range' are consistent with the range heap data. It's the most expensive part of the brin_index_check(), so it's optional. --- contrib/amcheck/amcheck--1.5--1.6.sql | 5 +- contrib/amcheck/expected/check_brin.out | 22 +- contrib/amcheck/sql/check_brin.sql | 22 +- contrib/amcheck/t/007_verify_brin.pl | 51 +++- contrib/amcheck/verify_brin.c | 313 +++++++++++++++++++++++- 5 files changed, 377 insertions(+), 36 deletions(-) diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql index 9ec046bb1cf..d4f44495bba 100644 --- a/contrib/amcheck/amcheck--1.5--1.6.sql +++ b/contrib/amcheck/amcheck--1.5--1.6.sql @@ -8,11 +8,12 @@ -- brin_index_check() -- CREATE FUNCTION brin_index_check(index regclass, - regular_pages_check boolean default false + regularpagescheck boolean default false, + heapallindexed boolean default false ) RETURNS VOID AS 'MODULE_PATHNAME', 'brin_index_check' LANGUAGE C STRICT PARALLEL RESTRICTED; -- We don't want this to be available to public -REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean) FROM PUBLIC; \ No newline at end of file +REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean) FROM PUBLIC; \ No newline at end of file diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out index e5fc52ed747..d8898f47fbe 100644 --- a/contrib/amcheck/expected/check_brin.out +++ b/contrib/amcheck/expected/check_brin.out @@ -5,7 +5,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); CREATE INDEX brintest_idx ON brintest USING BRIN (a); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -19,7 +19,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_min INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x; -- create some empty ranges DELETE FROM brintest WHERE id > 1500 AND id < 2500; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -28,7 +28,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -42,7 +42,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -51,7 +51,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -65,7 +65,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -74,7 +74,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -88,7 +88,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -97,7 +97,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -113,7 +113,7 @@ SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, rando FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -122,7 +122,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql index b36af37fe03..b4137799c57 100644 --- a/contrib/amcheck/sql/check_brin.sql +++ b/contrib/amcheck/sql/check_brin.sql @@ -7,7 +7,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); CREATE INDEX brintest_idx ON brintest USING BRIN (a); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -17,12 +17,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_min INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x; -- create some empty ranges DELETE FROM brintest WHERE id > 1500 AND id < 2500; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -34,12 +34,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -51,12 +51,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -68,12 +68,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -87,12 +87,12 @@ FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; diff --git a/contrib/amcheck/t/007_verify_brin.pl b/contrib/amcheck/t/007_verify_brin.pl index 2c62b76cc70..51bfed7e273 100644 --- a/contrib/amcheck/t/007_verify_brin.pl +++ b/contrib/amcheck/t/007_verify_brin.pl @@ -200,6 +200,55 @@ my @tests = ( return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa');); }, expected => wrap("revmap doesn't point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)") + }, + { + # range is marked as empty_range, but heap has some data for the range + + find => pack('LCC', 0, 0x88, 0x03), + replace => pack('LCC', 0, 0xA8, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null);); + }, + expected => wrap('range is marked as empty but contains qualified live tuples. Range blkno: 0, heap tid (0,1)') + }, + { + # range hasnulls & allnulls are false, but heap contains null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x00), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range hasnulls and allnulls are false, but contains a null value. Range blkno: 0, heap tid (0,1)') + }, + { + # range allnulls is true, but heap contains non-null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range allnulls is true, but contains nonnull value. Range blkno: 0, heap tid (0,2)') + }, + { + # consistent function return FALSE for the valid heap value + # replace "ccccc" with "bbbbb" so that min_max index was too narrow + + find => 'ccccc', + replace => 'bbbbb', + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'), ('ccccc');); + }, + expected => wrap('heap tuple inconsistent with index. Range blkno: 0, heap tid (0,2)') } ); @@ -241,7 +290,7 @@ foreach my $test_struct (@tests) { $node->start; foreach my $test_struct (@tests) { - my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true))); + my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true, true))); like($stderr, $test_struct->{expected}); } diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c index e9327f2f895..a446a210030 100644 --- a/contrib/amcheck/verify_brin.c +++ b/contrib/amcheck/verify_brin.c @@ -38,7 +38,8 @@ typedef struct BrinCheckState /* Check arguments */ - bool regular_pages_check; + bool regularpagescheck; + bool heapallindexed; /* BRIN check common fields */ @@ -67,6 +68,25 @@ typedef struct BrinCheckState Page regpage; OffsetNumber regpageoffset; + /* Heap all indexed check fields */ + + BrinRevmap *revmap; + Buffer buf; + FmgrInfo *withinRangeFn; + double range_cnt; + /* first block of the next range */ + BlockNumber nextrangeBlk; + + /* + * checkable_range shows if current range could be checked and dtup + * contains valid index tuple for the range. It could be false if the + * current range is not summarized, or it's placeholder, or it's just a + * beginning of the check + */ + bool checkable_range; + BrinMemTuple *dtup; + MemoryContext rangeCtx; + MemoryContext heaptupleCtx; } BrinCheckState; static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly); @@ -87,6 +107,17 @@ static bool revmap_points_to_index_tuple(BrinCheckState * state); static ItemId PageGetItemIdCareful(BrinCheckState * state); +static void check_heap_all_indexed(BrinCheckState * state); + +static void brin_check_callback(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate); + +static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid); + static void brin_check_ereport(BrinCheckState * state, const char *fmt); static void revmap_item_ereport(BrinCheckState * state, const char *fmt); @@ -95,6 +126,7 @@ static void index_tuple_ereport(BrinCheckState * state, const char *fmt); static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt); +static void all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message); Datum brin_index_check(PG_FUNCTION_ARGS) @@ -102,7 +134,8 @@ brin_index_check(PG_FUNCTION_ARGS) Oid indrelid = PG_GETARG_OID(0); BrinCheckState *state = palloc0(sizeof(BrinCheckState)); - state->regular_pages_check = PG_GETARG_BOOL(1); + state->regularpagescheck = PG_GETARG_BOOL(1); + state->heapallindexed = PG_GETARG_BOOL(2); amcheck_lock_relation_and_check(indrelid, BRIN_AM_OID, @@ -127,9 +160,12 @@ brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonl state->bdesc = brin_build_desc(idxrel); state->natts = state->bdesc->bd_tupdesc->natts; - check_brin_index_structure(state); + if (state->heapallindexed) + { + check_heap_all_indexed(state); + } brin_free_desc(state->bdesc); } @@ -160,8 +196,13 @@ check_brin_index_structure(BrinCheckState * state) /* Check revmap first, blocks: [1, lastRevmapPage] */ check_revmap(state); - /* Check regular pages, blocks: [lastRevmapPage + 1, idxnblocks] */ - check_regular_pages(state); + + if (state->regularpagescheck) + { + /* Check regular pages, blocks: [lastRevmapPage + 1, idxnblocks] */ + check_regular_pages(state); + } + } /* Meta page check and save some data for the further check */ @@ -614,11 +655,6 @@ check_regular_pages(BrinCheckState * state) ReadStreamBlockNumberCB stream_cb; BlockRangeReadStreamPrivate stream_data; - if (!state->regular_pages_check) - { - return; - } - /* reset state */ state->revmapBlk = InvalidBlockNumber; state->revmapbuf = InvalidBuffer; @@ -628,7 +664,6 @@ check_regular_pages(BrinCheckState * state) state->regpageoffset = InvalidOffsetNumber; state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel); - /* * Prepare stream data for regular pages walk. It is safe to use batchmode * as block_range_read_stream_cb takes no locks. @@ -788,6 +823,246 @@ PageGetItemIdCareful(BrinCheckState * state) return itemid; } +/* + * Check that every heap tuple are consistent with the index. + * + * We use withinRange function for every nonnull value (in case + * of io_regular_nulls = false we use withinRange function for null values too). + * + * Also, we check that fields 'empty_range', 'all_nulls' and 'has_nulls' + * are not too "narrow" for each range, which means: + * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true) + * 2) all_nulls = true, but we see nonnull value. + * 3) empty_range = true, but we see tuple within the range. + * + * We use allowSync = false, because this way + * we process full ranges one by one from the first range. + * It's not necessary, but makes the code simpler and this way + * we need to fetch every index tuple only once. + */ +static void +check_heap_all_indexed(BrinCheckState * state) +{ + Relation idxrel = state->idxrel; + Relation heaprel = state->heaprel; + double reltuples; + IndexInfo *indexInfo; + + /* heap all indexed check fields initialization */ + + state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange); + state->dtup = brin_new_memtuple(state->bdesc); + state->checkable_range = false; + state->withinRangeFn = palloc0_array(FmgrInfo, state->natts); + state->range_cnt = 0; + /* next range is the first range in the beginning */ + state->nextrangeBlk = 0; + state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check range context", + ALLOCSET_DEFAULT_SIZES); + state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check tuple context", + ALLOCSET_DEFAULT_SIZES); + + /* Prepare withinRange function for each attribute */ + for (AttrNumber attno = 1; attno <= state->natts; attno++) + { + if (RegProcedureIsValid(index_getprocid(state->idxrel, attno, BRIN_PROCNUM_WITHINRANGE))) + { + FmgrInfo *fn = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_WITHINRANGE); + + fmgr_info_copy(&state->withinRangeFn[attno - 1], fn, CurrentMemoryContext); + } + else + { + /* + * If there is no withinRange function for the attribute, notice + * user about it. We continue heap all indexed check even for + * indexes with just one attribute as even without support + * function some errors could be detected. + */ + ereport(NOTICE, + errmsg("Missing support function %d for attribute %d of index \"%s\". " + "Skip heap all indexed check for this attribute.", + BRIN_PROCNUM_WITHINRANGE, + attno, + RelationGetRelationName(state->idxrel) + )); + } + + } + + indexInfo = BuildIndexInfo(idxrel); + + /* + * Use snapshot to check only those tuples that are guaranteed to be + * indexed already. Using SnapshotAny would make it more difficult to say + * if there is a corruption or checked tuple just haven't been indexed + * yet. + */ + indexInfo->ii_Concurrent = true; + reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true, + brin_check_callback, (void *) state, NULL); + + elog(DEBUG3, "ranges were checked: %f", state->range_cnt); + elog(DEBUG3, "scan total tuples: %f", reltuples); + + if (state->buf != InvalidBuffer) + ReleaseBuffer(state->buf); + + brinRevmapTerminate(state->revmap); + MemoryContextDelete(state->rangeCtx); + MemoryContextDelete(state->heaptupleCtx); +} + +/* + * We walk from the first range (blkno = 0) to the last as the scan proceed. + * For every heap tuple we check if we are done with the current range, and we need to move further + * to the current heap tuple's range. While moving to the next range we check that it's not empty (because + * we have at least one tuple for this range). + * Every heap tuple are checked to be consistent with the range it belongs to. + * In case of unsummarized ranges and placeholders we skip all checks. + * + * While moving, we may jump over some ranges, + * but it's okay because we would not be able to check them anyway. + * We also can't say whether skipped ranges should be marked as empty or not, + * since it's possible that there were some tuples before that are now deleted. + * + */ +static void +brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate) +{ + BrinCheckState *state; + BlockNumber heapblk; + + state = (BrinCheckState *) brstate; + heapblk = ItemPointerGetBlockNumber(tid); + + /* If we went beyond the current range let's fetch new range */ + if (heapblk >= state->nextrangeBlk) + { + BrinTuple *tup; + BrinTuple *tupcopy = NULL; + MemoryContext oldCtx; + OffsetNumber off; + Size size; + Size btupsz = 0; + + MemoryContextReset(state->rangeCtx); + oldCtx = MemoryContextSwitchTo(state->rangeCtx); + + state->range_cnt++; + + /* Move to the range that contains current heap tuple */ + tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf, + &off, &size, BUFFER_LOCK_SHARE); + + if (tup) + { + tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz); + LockBuffer(state->buf, BUFFER_LOCK_UNLOCK); + state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup); + + /* We can't check placeholder ranges */ + state->checkable_range = !state->dtup->bt_placeholder; + } + else + { + /* We can't check unsummarized ranges. */ + state->checkable_range = false; + } + + /* + * Update nextrangeBlk so we know when we are done with the current + * range + */ + state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange; + + MemoryContextSwitchTo(oldCtx); + + /* Range must not be empty */ + if (state->checkable_range && state->dtup->bt_empty_range) + { + all_consist_ereport(state, tid, "range is marked as empty but contains qualified live tuples"); + } + + } + + /* Check tuple is consistent with the index */ + if (state->checkable_range) + { + check_heap_tuple(state, values, isnull, tid); + } + +} + +/* + * We check hasnulls flags for null values and oi_regular_nulls = true, + * check allnulls is false for all nonnull values not matter oi_regular_nulls is true or not, + * We use withinRangeFn for all nonnull values and null values if io_regular_nulls = false. + */ +static void +check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid) +{ + int attindex; + BrinMemTuple *dtup = state->dtup; + BrinDesc *bdesc = state->bdesc; + MemoryContext oldCtx; + + Assert(state->checkable_range); + + MemoryContextReset(state->heaptupleCtx); + oldCtx = MemoryContextSwitchTo(state->heaptupleCtx); + + /* check every index attribute */ + for (attindex = 0; attindex < state->natts; attindex++) + { + BrinValues *bval; + Datum withinRangeFnResult; + bool withinRange; + bool oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls; + + bval = &dtup->bt_columns[attindex]; + + /* + * Use hasnulls flag for oi_regular_nulls is true. Otherwise, delegate + * check to withinRangeFn + */ + if (nulls[attindex] && oi_regular_nulls) + { + /* We have null value, so hasnulls or allnulls must be true */ + if (!(bval->bv_hasnulls || bval->bv_allnulls)) + { + all_consist_ereport(state, tid, "range hasnulls and allnulls are false, but contains a null value"); + } + continue; + } + + /* If we have a nonnull value allnulls should be false */ + if (!nulls[attindex] && bval->bv_allnulls) + { + all_consist_ereport(state, tid, "range allnulls is true, but contains nonnull value"); + } + + /* If oi_regular_nulls = true we should never get there with null */ + Assert(!oi_regular_nulls || !nulls[attindex]); + + withinRangeFnResult = FunctionCall4Coll(&state->withinRangeFn[attindex], + state->idxrel->rd_indcollation[attindex], + PointerGetDatum(bdesc), + PointerGetDatum(bval), + values[attindex], + nulls[attindex]); + + withinRange = DatumGetBool(withinRangeFnResult); + if (!withinRange) + { + all_consist_ereport(state, tid, "heap tuple inconsistent with index"); + } + } + + MemoryContextSwitchTo(oldCtx); +} /* Report without any additional info */ static void @@ -853,3 +1128,19 @@ revmap_item_ereport(BrinCheckState * state, const char *fmt) state->revmapBlk, state->revmapidx))); } + +/* Report with range blkno, heap tuple info */ +static void +all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message) +{ + Assert(state->rangeBlkno != InvalidBlockNumber); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is not consistent with the heap - %s. Range blkno: %u, heap tid (%u,%u)", + RelationGetRelationName(state->idxrel), + message, + state->dtup->bt_blkno, + ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)))); +} -- 2.43.0