From b63a58c68180872d882b5b3081ea40654e9062b3 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Veldanda Date: Mon, 14 Apr 2025 21:45:42 +0000 Subject: [PATCH v11 3/7] Zstd dictionary training process --- doc/src/sgml/ref/create_type.sgml | 21 +- src/backend/catalog/heap.c | 10 +- src/backend/catalog/pg_type.c | 11 +- src/backend/catalog/pg_zstd_dictionaries.c | 490 ++++++++++++++++++++- src/backend/commands/analyze.c | 7 +- src/backend/commands/typecmds.c | 107 ++++- src/backend/utils/cache/typcache.c | 2 + src/include/catalog/pg_proc.dat | 35 ++ src/include/catalog/pg_type.dat | 4 +- src/include/catalog/pg_type.h | 8 +- src/include/parser/analyze.h | 5 + src/include/utils/typcache.h | 1 + src/test/regress/expected/oidjoins.out | 1 + src/tools/pgindent/typedefs.list | 3 + 14 files changed, 680 insertions(+), 25 deletions(-) diff --git a/doc/src/sgml/ref/create_type.sgml b/doc/src/sgml/ref/create_type.sgml index 994dfc65268..5f9d61db004 100644 --- a/doc/src/sgml/ref/create_type.sgml +++ b/doc/src/sgml/ref/create_type.sgml @@ -56,6 +56,7 @@ CREATE TYPE name ( [ , ELEMENT = element ] [ , DELIMITER = delimiter ] [ , COLLATABLE = collatable ] + [ , ZSTD_SAMPLING = zstd_sampling_function ] ) CREATE TYPE name @@ -211,7 +212,8 @@ CREATE TYPE name type_modifier_input_function, type_modifier_output_function, analyze_function, and - subscript_function + subscript_function, and + zstd_sampling_function are optional. Generally these functions have to be coded in C or another low-level language. @@ -491,6 +493,15 @@ CREATE TYPE name make use of the collation information; this does not happen automatically merely by marking the type collatable. + + + The optional zstd_sampling_function + performs type-specific sampling for a column of the corresponding data type. + By default, for jsonb data type, the function std_zstd_sampling_for_jsonb is defined. It attempts to gather samples for a jsonb + and returns a sample buffer for zstd dictionary training. The training function must be declared to accept two arguments of type internal and return an internal result. + The detailed information for zstd training function is provided in src/backend/catalog/pg_zstd_dictionaries.c. + + @@ -846,6 +857,14 @@ CREATE TYPE name + + zstd_sampling + + + Specifies the name of a function that performs sampling on specific type. + + + diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index fbaed5359ad..9d3a8536e23 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1071,7 +1071,10 @@ AddNewRelationType(const char *typeName, -1, /* typmod */ 0, /* array dimensions for typBaseType */ false, /* Type NOT NULL */ - InvalidOid); /* rowtypes never have a collation */ + InvalidOid, /* rowtypes never have a collation */ + F_COMPOSITE_TYPZSTDSAMPLING /* generate dictionary + * procedure - default */ + ); } /* -------------------------------- @@ -1394,7 +1397,10 @@ heap_create_with_catalog(const char *relname, -1, /* typmod */ 0, /* array dimensions for typBaseType */ false, /* Type NOT NULL */ - InvalidOid); /* rowtypes never have a collation */ + InvalidOid, /* rowtypes never have a collation */ + F_ARRAY_TYPZSTDSAMPLING /* generate dictionary procedure - + * default */ + ); pfree(relarrayname); } diff --git a/src/backend/catalog/pg_type.c b/src/backend/catalog/pg_type.c index b36f81afb9d..8ff9b4d327f 100644 --- a/src/backend/catalog/pg_type.c +++ b/src/backend/catalog/pg_type.c @@ -120,6 +120,7 @@ TypeShellMake(const char *typeName, Oid typeNamespace, Oid ownerId) values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(-1); values[Anum_pg_type_typndims - 1] = Int32GetDatum(0); values[Anum_pg_type_typcollation - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_type_typzstdsampling - 1] = ObjectIdGetDatum(InvalidOid); nulls[Anum_pg_type_typdefaultbin - 1] = true; nulls[Anum_pg_type_typdefault - 1] = true; nulls[Anum_pg_type_typacl - 1] = true; @@ -223,7 +224,8 @@ TypeCreate(Oid newTypeOid, int32 typeMod, int32 typNDims, /* Array dimensions for baseType */ bool typeNotNull, - Oid typeCollation) + Oid typeCollation, + Oid zstdSamplingProcedure) { Relation pg_type_desc; Oid typeObjectId; @@ -378,6 +380,7 @@ TypeCreate(Oid newTypeOid, values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(typeMod); values[Anum_pg_type_typndims - 1] = Int32GetDatum(typNDims); values[Anum_pg_type_typcollation - 1] = ObjectIdGetDatum(typeCollation); + values[Anum_pg_type_typzstdsampling - 1] = ObjectIdGetDatum(zstdSamplingProcedure); /* * initialize the default binary value for this type. Check for nulls of @@ -679,6 +682,12 @@ GenerateTypeDependencies(HeapTuple typeTuple, add_exact_object_address(&referenced, addrs_normal); } + if (OidIsValid(typeForm->typzstdsampling)) + { + ObjectAddressSet(referenced, ProcedureRelationId, typeForm->typzstdsampling); + add_exact_object_address(&referenced, addrs_normal); + } + if (OidIsValid(typeForm->typsubscript)) { ObjectAddressSet(referenced, ProcedureRelationId, typeForm->typsubscript); diff --git a/src/backend/catalog/pg_zstd_dictionaries.c b/src/backend/catalog/pg_zstd_dictionaries.c index d5e965c34d0..58964a600a3 100644 --- a/src/backend/catalog/pg_zstd_dictionaries.c +++ b/src/backend/catalog/pg_zstd_dictionaries.c @@ -14,10 +14,498 @@ #include "postgres.h" #include "fmgr.h" +#include "access/table.h" +#include "catalog/dependency.h" +#include "catalog/indexing.h" +#include "catalog/pg_class_d.h" #include "catalog/pg_zstd_dictionaries.h" #include "catalog/pg_zstd_dictionaries_d.h" +#include "catalog/pg_depend.h" +#include "catalog/namespace.h" +#include "catalog/pg_attribute.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" #include "utils/syscache.h" -#include "varatt.h" +#include "access/toast_compression.h" +#include "utils/attoptcache.h" +#include "parser/analyze.h" +#include "nodes/makefuncs.h" +#include "access/reloptions.h" +#include "access/genam.h" +#include "access/htup_details.h" +#include "access/sdir.h" +#include "utils/lsyscache.h" +#include "utils/relcache.h" +#include "utils/memutils.h" +#include "utils/varlena.h" +#include "nodes/pg_list.h" +#include "utils/array.h" +#include "utils/rangetypes.h" +#include "utils/multirangetypes.h" +#include "utils/snapmgr.h" +#include "access/xact.h" + +#ifdef USE_ZSTD +#include +#include +#endif + +#define TARGET_ROWS 30000 + +typedef struct ZstdTrainingData ZstdTrainingData; + +struct ZstdTrainingData +{ + char *sample_buffer; /* Pointer to the raw sample buffer */ + size_t *sample_sizes; /* Array of sample sizes */ + size_t nitems; /* Number of sample sizes */ + size_t total_size; /* Running total sample size */ +}; + +static bool build_zstd_dictionary_internal(Oid relid, AttrNumber attno); +static Oid GetNewDictId(Relation relation); +static bool append_sample(ZstdTrainingData *dict, const char *sample, size_t sample_size); +static bool sample_varlena_datum(Datum datum, ZstdTrainingData *dict); +static int cleanup_unused_zstd_dictionaries_internal(void); + +/* + * build_zstd_dictionary_internal + * 1) Validate that the given (relid, attno) can have a Zstd compression enabled on heap relation + * 2) Call the type-specific sampling procedure + * 3) Train a dictionary via ZDICT_trainFromBuffer() + * 4) Insert dictionary into pg_zstd_dictionaries + * 5) Update pg_attribute.attoptions with new dictid + */ +pg_attribute_unused() +static bool +build_zstd_dictionary_internal(Oid relid, AttrNumber attno) +{ +#ifndef USE_ZSTD + return false; +#else + Relation rel; + Form_pg_attribute att; + AttributeOpts *attopt; + TypeCacheEntry *typentry; + ZstdTrainingData dict = {0}; + HeapTuple sample_rows[TARGET_ROWS]; + int num_sampled; + double totalrows = 0, + totaldeadrows = 0; + int i; + size_t dict_size; + void *dict_data; + Oid dictid; + bytea *dict_bytea; + Relation catalogRel, + attRel; + HeapTuple tup, + atttup, + newtuple; + Datum values[Natts_pg_zstd_dictionaries]; + bool nulls[Natts_pg_zstd_dictionaries]; + Datum attoptionsDatum, + newOptions; + bool isnull; + Datum repl_val[Natts_pg_attribute]; + bool repl_null[Natts_pg_attribute]; + bool repl_repl[Natts_pg_attribute]; + DefElem *def; + ObjectAddress dictObj, + relObj; + + /* Open relation, verify regular table */ + rel = table_open(relid, AccessShareLock); + if (rel->rd_rel->relkind != RELKIND_RELATION) + goto fail; + + att = TupleDescAttr(RelationGetDescr(rel), attno - 1); + if (att->attcompression != TOAST_ZSTD_COMPRESSION) + goto fail; + + /* Check attoptions for user-requested dictionary size, etc. */ + attopt = get_attribute_options(relid, attno); + if (attopt && attopt->zstd_dict_size == 0) + goto fail; + + /* + * 2) Look up the type's custom dictionary builder function We'll call it + * to get sample training data. + */ + typentry = lookup_type_cache(att->atttypid, 0); + if (typentry->typlen != -1 || !OidIsValid(typentry->typzstdsampling)) + goto fail; + + num_sampled = acquire_sample_rows(rel, 0, sample_rows, TARGET_ROWS, &totalrows, &totaldeadrows); + if (num_sampled == 0) + goto fail; + + for (i = 0; i < num_sampled; i++) + { + Datum value; + + value = heap_getattr(sample_rows[i], attno, RelationGetDescr(rel), &isnull); + + if (!isnull && !DatumGetBool(OidFunctionCall2(typentry->typzstdsampling, value, PointerGetDatum(&dict)))) + break; + } + + if (dict.nitems == 0) + goto fail; + + /* ZSTD Dictionary training */ + dict_size = attopt ? attopt->zstd_dict_size : DEFAULT_ZSTD_DICT_SIZE; + dict_data = palloc(dict_size); + + dict_size = ZDICT_trainFromBuffer(dict_data, dict_size, dict.sample_buffer, dict.sample_sizes, dict.nitems); + if (ZDICT_isError(dict_size)) + { + elog(LOG, "Zstd dictionary training failed: %s", ZDICT_getErrorName(dict_size)); + goto cleanup_dict; + } + + /* Insert dictionary into catalog */ + dict_bytea = palloc(VARHDRSZ + dict_size); + SET_VARSIZE(dict_bytea, VARHDRSZ + dict_size); + memcpy(VARDATA(dict_bytea), dict_data, dict_size); + + catalogRel = table_open(ZstdDictionariesRelationId, ShareRowExclusiveLock); + dictid = GetNewDictId(catalogRel); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + values[Anum_pg_zstd_dictionaries_dictid - 1] = ObjectIdGetDatum(dictid); + values[Anum_pg_zstd_dictionaries_dict - 1] = PointerGetDatum(dict_bytea); + + tup = heap_form_tuple(RelationGetDescr(catalogRel), values, nulls); + CatalogTupleInsert(catalogRel, tup); + + heap_freetuple(tup); + pfree(dict_bytea); + table_close(catalogRel, NoLock); + + /* + * Update pg_attribute.attoptions with "dictid" => dictid so the column + * knows which dictionary to use at compression time. + */ + attRel = table_open(AttributeRelationId, RowExclusiveLock); + atttup = SearchSysCacheAttNum(relid, attno); + if (!HeapTupleIsValid(atttup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column number %d of relation \"%u\" does not exist", + attno, relid))); + + /* Build new attoptions with dictid=... */ + def = makeDefElem("dictid", + (Node *) makeString(psprintf("%u", dictid)), + -1); + + attoptionsDatum = SysCacheGetAttr(ATTNUM, atttup, + Anum_pg_attribute_attoptions, + &isnull); + newOptions = transformRelOptions(isnull ? (Datum) 0 : attoptionsDatum, + list_make1(def), + NULL, NULL, + false, false); + /* Validate them (throws error if invalid) */ + (void) attribute_reloptions(newOptions, true); + + memset(repl_null, false, sizeof(repl_null)); + memset(repl_repl, false, sizeof(repl_repl)); + + repl_val[Anum_pg_attribute_attoptions - 1] = newOptions; + repl_repl[Anum_pg_attribute_attoptions - 1] = true; + + newtuple = heap_modify_tuple(atttup, RelationGetDescr(attRel), repl_val, repl_null, repl_repl); + CatalogTupleUpdate(attRel, &newtuple->t_self, newtuple); + + heap_freetuple(newtuple); + ReleaseSysCache(atttup); + table_close(attRel, NoLock); + + /* Record dependency, relation is depended on this dictionary */ + ObjectAddressSet(dictObj, ZstdDictionariesRelationId, dictid); + ObjectAddressSet(relObj, RelationRelationId, relid); + recordDependencyOn(&relObj, &dictObj, DEPENDENCY_NORMAL); + + pfree(dict_data); + table_close(rel, NoLock); + return true; + +cleanup_dict: + pfree(dict_data); +fail: + table_close(rel, NoLock); + + return false; +#endif +} + +/* + * Acquire a new unique DictId for a relation. + * + * Assumes the relation is already locked with ShareRowExclusiveLock, + * ensuring that concurrent transactions cannot generate duplicate DictIds. + */ +pg_attribute_unused() +static Oid +GetNewDictId(Relation dictRel) +{ + Relation idxRel; + Oid maxDictId = InvalidOid; + Oid newDictId; + SysScanDesc scan; + HeapTuple tuple; + + /* + * Open the index to read existing DictId values. + */ + idxRel = index_open(ZstdDictidIndexId, AccessShareLock); + + /* + * Retrieve the maximum existing DictId by scanning in reverse order. This + * relies on the index being sorted ascending on dictid, so scanning + * backward finds the largest value first. + */ + scan = systable_beginscan_ordered(dictRel, + idxRel, + SnapshotSelf, + 0, NULL); + + tuple = systable_getnext_ordered(scan, BackwardScanDirection); + if (HeapTupleIsValid(tuple)) + { + Datum value; + bool isNull; + + value = heap_getattr(tuple, + Anum_pg_zstd_dictionaries_dictid, + RelationGetDescr(dictRel), + &isNull); + if (!isNull) + maxDictId = DatumGetObjectId(value); + } + systable_endscan_ordered(scan); + index_close(idxRel, AccessShareLock); + + /* Propose new DictId one higher than the max found. */ + newDictId = maxDictId + 1; + Assert(newDictId != InvalidDictId); + + if (newDictId <= InvalidDictId || newDictId > UINT32_MAX) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("dictid is not in expected range"))); + + return newDictId; +} + +/* + * append_sample + * + * Given a sample (raw bytes) and its size, append it to the training data. + * This function re-allocates (or allocates) the contiguous sample_buffer and + * the sample_sizes array. It returns true if the new total allocation does not + * exceed MaxAllocSize, false otherwise. + */ +static bool +append_sample(ZstdTrainingData *dict, const char *sample, size_t sample_size) +{ + if ((dict->total_size + sample_size) > MaxAllocSize) + return false; + + if (dict->sample_buffer == NULL) + dict->sample_buffer = palloc(sample_size); + else + dict->sample_buffer = repalloc(dict->sample_buffer, dict->total_size + sample_size); + + memcpy(dict->sample_buffer + dict->total_size, sample, sample_size); + dict->total_size += sample_size; + + if (dict->sample_sizes == NULL) + dict->sample_sizes = palloc(sizeof(size_t)); + else + dict->sample_sizes = repalloc(dict->sample_sizes, (dict->nitems + 1) * sizeof(size_t)); + + dict->sample_sizes[dict->nitems++] = sample_size; + + return true; +} + +/* Common helper for jsonb and text */ +static bool +sample_varlena_datum(Datum datum, ZstdTrainingData *dict) +{ + struct varlena *attr = (struct varlena *) PG_DETOAST_DATUM(datum); + + return append_sample(dict, VARDATA_ANY(attr), VARSIZE_ANY_EXHDR(attr)); +} + +/* + * std_zstd_sampling_for_jsonb + * + * Processes a single jsonb sample. + * It detoasts the datum, obtains the raw sample (excluding the header), + * and appends it into the provided ZstdTrainingData structure. + * + * Returns true if the sample was successfully appended, false otherwise. + */ +Datum +std_zstd_sampling_for_jsonb(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(sample_varlena_datum(PG_GETARG_DATUM(0), (ZstdTrainingData *) PG_GETARG_POINTER(1))); +} + +/* + * std_zstd_sampling_for_text + * + * Processes a single text sample. + * It detoasts the datum, obtains the raw sample (excluding the header), + * and appends it into the provided ZstdTrainingData structure. + * + * Returns true if the sample was successfully appended, false otherwise. + */ +Datum +std_zstd_sampling_for_text(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(sample_varlena_datum(PG_GETARG_DATUM(0), (ZstdTrainingData *) PG_GETARG_POINTER(1))); +} + +/* + * array_typzstdsampling -- typzstdsampling function for array columns + */ +Datum +array_typzstdsampling(PG_FUNCTION_ARGS) +{ + ArrayType *array = DatumGetArrayTypeP(PG_GETARG_DATUM(0)); + ZstdTrainingData *dict = (ZstdTrainingData *) PG_GETARG_POINTER(1); + Datum *elements; + bool *nulls; + int nelems; + TypeCacheEntry *elemCache = lookup_type_cache(ARR_ELEMTYPE(array), 0); + + if (!OidIsValid(elemCache->typzstdsampling)) + PG_RETURN_BOOL(false); + + deconstruct_array(array, ARR_ELEMTYPE(array), elemCache->typlen, elemCache->typbyval, elemCache->typalign, &elements, &nulls, &nelems); + + for (int j = 0; j < nelems; j++) + if (!nulls[j] && !DatumGetBool(OidFunctionCall2(elemCache->typzstdsampling, elements[j], PointerGetDatum(dict)))) + break; + + pfree(elements); + pfree(nulls); + PG_RETURN_BOOL(true); +} + +Datum +range_typzstdsampling(PG_FUNCTION_ARGS) +{ + RangeType *range = DatumGetRangeTypeP(PG_GETARG_DATUM(0)); + ZstdTrainingData *dict = (ZstdTrainingData *) PG_GETARG_POINTER(1); + RangeBound lower, + upper; + bool empty; + + /* Get information about range type; note column might be a domain */ + TypeCacheEntry *typcache = range_get_typcache(fcinfo, getBaseType(range->rangetypid)); + + /* If the type does not supply a builder, skip */ + if (!OidIsValid(typcache->rngelemtype->typzstdsampling)) + PG_RETURN_BOOL(false); + + range_deserialize(typcache, range, &lower, &upper, &empty); + if (empty) + PG_RETURN_BOOL(false); + + OidFunctionCall2(typcache->rngelemtype->typzstdsampling, lower.val, PointerGetDatum(dict)); + + OidFunctionCall2(typcache->rngelemtype->typzstdsampling, upper.val, PointerGetDatum(dict)); + + PG_RETURN_BOOL(true); +} + +Datum +multirange_typzstdsampling(PG_FUNCTION_ARGS) +{ + MultirangeType *mrange = DatumGetMultirangeTypeP(PG_GETARG_DATUM(0)); + ZstdTrainingData *dict = (ZstdTrainingData *) PG_GETARG_POINTER(1); + int32 rangeCount; + RangeType **ranges; + + /* Get information about multirange type; note column might be a domain */ + TypeCacheEntry *typcache = multirange_get_typcache(fcinfo, getBaseType(mrange->multirangetypid)); + + /* If the type does not supply a builder, skip */ + if (!OidIsValid(typcache->rngtype->typzstdsampling)) + PG_RETURN_BOOL(false); + + /* Deserialize the multirange into an array of RangeType pointers */ + multirange_deserialize(typcache->rngtype, mrange, &rangeCount, &ranges); + + for (int j = 0; j < rangeCount; j++) + if (!DatumGetBool(OidFunctionCall2(typcache->rngtype->typzstdsampling, RangeTypePGetDatum(ranges[j]), PointerGetDatum(dict)))) + break; + + PG_RETURN_BOOL(true); +} + +Datum +composite_typzstdsampling(PG_FUNCTION_ARGS) +{ + HeapTupleHeader th = DatumGetHeapTupleHeader(PG_GETARG_DATUM(0)); + ZstdTrainingData *dict = (ZstdTrainingData *) PG_GETARG_POINTER(1); + + TupleDesc tupdesc = lookup_rowtype_tupdesc(HeapTupleHeaderGetTypeId(th), HeapTupleHeaderGetTypMod(th)); + HeapTupleData tuple = {.t_data = th,.t_len = HeapTupleHeaderGetDatumLength(th),.t_tableOid = InvalidOid}; + + ItemPointerSetInvalid(&tuple.t_self); + + for (int i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i); + bool isnull; + Datum fieldDatum; + + if (attr->attisdropped || attr->atthasmissing) + continue; + + fieldDatum = heap_getattr(&tuple, i + 1, tupdesc, &isnull); + + if (!isnull) + { + /* Look up the type cache entry for the attribute's type */ + TypeCacheEntry *typcache = lookup_type_cache(attr->atttypid, 0); + + if (OidIsValid(typcache->typzstdsampling) && !DatumGetBool(OidFunctionCall2(typcache->typzstdsampling, fieldDatum, PointerGetDatum(dict)))) + break; + } + } + + ReleaseTupleDesc(tupdesc); + PG_RETURN_BOOL(true); +} + +Datum +build_zstd_dict_for_attribute(PG_FUNCTION_ARGS) +{ +#ifndef USE_ZSTD + PG_RETURN_BOOL(false); +#else + text *tablename = PG_GETARG_TEXT_PP(0); + AttrNumber attno = PG_GETARG_INT32(1); + + /* Look up table name. */ + RangeVar *tablerel = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); + Oid tableoid = RangeVarGetRelid(tablerel, NoLock, false); + + bool ret = build_zstd_dictionary_internal(tableoid, attno); + + PG_RETURN_BOOL(ret); +#endif +} /* * get_zstd_dict - Fetches the ZSTD dictionary from the catalog diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 4fffb76e557..abb92a3a4fe 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -55,7 +55,7 @@ #include "utils/sortsupport.h" #include "utils/syscache.h" #include "utils/timestamp.h" - +#include "parser/analyze.h" /* Per-index data for ANALYZE */ typedef struct AnlIndexData @@ -85,9 +85,6 @@ static void compute_index_stats(Relation onerel, double totalrows, MemoryContext col_context); static VacAttrStats *examine_attribute(Relation onerel, int attnum, Node *index_expr); -static int acquire_sample_rows(Relation onerel, int elevel, - HeapTuple *rows, int targrows, - double *totalrows, double *totaldeadrows); static int compare_rows(const void *a, const void *b, void *arg); static int acquire_inherited_sample_rows(Relation onerel, int elevel, HeapTuple *rows, int targrows, @@ -1195,7 +1192,7 @@ block_sampling_read_stream_next(ReadStream *stream, * block. The previous sampling method put too much credence in the row * density near the start of the table. */ -static int +int acquire_sample_rows(Relation onerel, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows) diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 45ae7472ab5..8014ee507a7 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -95,6 +95,7 @@ typedef struct bool updateTypmodout; bool updateAnalyze; bool updateSubscript; + bool updateZstdSampling; /* New values for relevant attributes */ char storage; Oid receiveOid; @@ -103,6 +104,7 @@ typedef struct Oid typmodoutOid; Oid analyzeOid; Oid subscriptOid; + Oid zstdSamplingOid; } AlterTypeRecurseParams; /* Potentially set by pg_upgrade_support functions */ @@ -122,6 +124,7 @@ static Oid findTypeSendFunction(List *procname, Oid typeOid); static Oid findTypeTypmodinFunction(List *procname); static Oid findTypeTypmodoutFunction(List *procname); static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid); +static Oid findTypeZstdSamplingFunction(List *procname, Oid typeOid); static Oid findTypeSubscriptingFunction(List *procname, Oid typeOid); static Oid findRangeSubOpclass(List *opcname, Oid subtype); static Oid findRangeCanonicalFunction(List *procname, Oid typeOid); @@ -162,6 +165,7 @@ DefineType(ParseState *pstate, List *names, List *parameters) List *typmodoutName = NIL; List *analyzeName = NIL; List *subscriptName = NIL; + List *zstdSamplingName = NIL; char category = TYPCATEGORY_USER; bool preferred = false; char delimiter = DEFAULT_TYPDELIM; @@ -190,6 +194,7 @@ DefineType(ParseState *pstate, List *names, List *parameters) DefElem *alignmentEl = NULL; DefElem *storageEl = NULL; DefElem *collatableEl = NULL; + DefElem *zstdSamplingEl = NULL; Oid inputOid; Oid outputOid; Oid receiveOid = InvalidOid; @@ -198,6 +203,7 @@ DefineType(ParseState *pstate, List *names, List *parameters) Oid typmodoutOid = InvalidOid; Oid analyzeOid = InvalidOid; Oid subscriptOid = InvalidOid; + Oid zstdSamplingOid = InvalidOid; char *array_type; Oid array_oid; Oid typoid; @@ -323,6 +329,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) defelp = &storageEl; else if (strcmp(defel->defname, "collatable") == 0) defelp = &collatableEl; + else if (strcmp(defel->defname, "zstd_sampling") == 0) + defelp = &zstdSamplingEl; else { /* WARNING, not ERROR, for historical backwards-compatibility */ @@ -455,6 +463,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) } if (collatableEl) collation = defGetBoolean(collatableEl) ? DEFAULT_COLLATION_OID : InvalidOid; + if (zstdSamplingEl) + zstdSamplingName = defGetQualifiedName(zstdSamplingEl); /* * make sure we have our required definitions @@ -516,6 +526,15 @@ DefineType(ParseState *pstate, List *names, List *parameters) errmsg("element type cannot be specified without a subscripting function"))); } + if (zstdSamplingName) + { + if (internalLength != -1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type zstd_sampling function must be specified only if data type is variable length."))); + zstdSamplingOid = findTypeZstdSamplingFunction(zstdSamplingName, typoid); + } + /* * Check permissions on functions. We choose to require the creator/owner * of a type to also own the underlying functions. Since creating a type @@ -550,6 +569,9 @@ DefineType(ParseState *pstate, List *names, List *parameters) if (analyzeOid && !object_ownercheck(ProcedureRelationId, analyzeOid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_FUNCTION, NameListToString(analyzeName)); + if (zstdSamplingOid && !object_ownercheck(ProcedureRelationId, zstdSamplingOid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_FUNCTION, + NameListToString(zstdSamplingName)); if (subscriptOid && !object_ownercheck(ProcedureRelationId, subscriptOid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_FUNCTION, NameListToString(subscriptName)); @@ -601,7 +623,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) -1, /* typMod (Domains only) */ 0, /* Array Dimensions of typbasetype */ false, /* Type NOT NULL */ - collation); /* type's collation */ + collation, /* type's collation */ + zstdSamplingOid); /* zstd_sampling procedure */ Assert(typoid == address.objectId); /* @@ -643,7 +666,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - collation); /* type's collation */ + collation, /* type's collation */ + F_ARRAY_TYPZSTDSAMPLING); /* zstd_sampling procedure */ pfree(array_type); @@ -706,6 +730,7 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) Oid receiveProcedure; Oid sendProcedure; Oid analyzeProcedure; + Oid zstdSamplingOid; bool byValue; char category; char delimiter; @@ -842,6 +867,9 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) /* Analysis function */ analyzeProcedure = baseType->typanalyze; + /* Generate dictionary function */ + zstdSamplingOid = baseType->typzstdsampling; + /* * Domains don't need a subscript function, since they are not * subscriptable on their own. If the base type is subscriptable, the @@ -1078,7 +1106,8 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) basetypeMod, /* typeMod value */ typNDims, /* Array dimensions for base type */ typNotNull, /* Type NOT NULL */ - domaincoll); /* type's collation */ + domaincoll, /* type's collation */ + zstdSamplingOid); /* zstd_sampling procedure */ /* * Create the array type that goes with it. @@ -1119,7 +1148,8 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - domaincoll); /* type's collation */ + domaincoll, /* type's collation */ + F_ARRAY_TYPZSTDSAMPLING); /* zstd_sampling procedure */ pfree(domainArrayName); @@ -1241,7 +1271,8 @@ DefineEnum(CreateEnumStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation */ + InvalidOid, /* type's collation */ + InvalidOid); /* generate dictionary procedure - default */ /* Enter the enum's values into pg_enum */ EnumValuesCreate(enumTypeAddr.objectId, stmt->vals); @@ -1282,7 +1313,8 @@ DefineEnum(CreateEnumStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation */ + InvalidOid, /* type's collation */ + InvalidOid); /* generate dictionary procedure - default */ pfree(enumArrayName); @@ -1583,7 +1615,9 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation (ranges never have one) */ + InvalidOid, /* type's collation (ranges never have one) */ + F_RANGE_TYPZSTDSAMPLING); /* generate dictionary + * procedure - default */ Assert(typoid == InvalidOid || typoid == address.objectId); typoid = address.objectId; @@ -1646,11 +1680,13 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) NULL, /* no binary form available either */ false, /* never passed by value */ alignment, /* alignment */ - 'x', /* TOAST strategy (always extended) */ + TYPSTORAGE_EXTENDED, /* TOAST strategy (always extended) */ -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation (ranges never have one) */ + InvalidOid, /* type's collation (ranges never have one) */ + F_MULTIRANGE_TYPZSTDSAMPLING); /* generate dictionary + * procedure - default */ Assert(multirangeOid == mltrngaddress.objectId); /* Create the entry in pg_range */ @@ -1693,7 +1729,9 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* typcollation */ + InvalidOid, /* typcollation */ + F_ARRAY_TYPZSTDSAMPLING); /* generate dictionary procedure - + * default */ pfree(rangeArrayName); @@ -1728,11 +1766,13 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) NULL, /* binary default isn't sent either */ false, /* never passed by value */ alignment, /* alignment - same as range's */ - 'x', /* ARRAY is always toastable */ + TYPSTORAGE_EXTENDED, /* ARRAY is always toastable */ -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* typcollation */ + InvalidOid, /* typcollation */ + F_ARRAY_TYPZSTDSAMPLING); /* generate dictionary procedure - + * default */ /* And create the constructor functions for this range type */ makeRangeConstructors(typeName, typeNamespace, typoid, rangeSubtype); @@ -2261,6 +2301,31 @@ findTypeAnalyzeFunction(List *procname, Oid typeOid) return procOid; } +static Oid +findTypeZstdSamplingFunction(List *procname, Oid typeOid) +{ + Oid argList[2]; + Oid procOid; + + argList[0] = INTERNALOID; + argList[1] = INTERNALOID; + + procOid = LookupFuncName(procname, 2, argList, true); + if (!OidIsValid(procOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("function %s does not exist", + func_signature_string(procname, 2, NIL, argList)))); + + if (get_func_rettype(procOid) != BOOLOID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type build zstd dictionary function %s must return type %s", + NameListToString(procname), "internal"))); + + return procOid; +} + static Oid findTypeSubscriptingFunction(List *procname, Oid typeOid) { @@ -4444,6 +4509,19 @@ AlterType(AlterTypeStmt *stmt) /* Replacing a subscript function requires superuser. */ requireSuper = true; } + else if (strcmp(defel->defname, "zstd_sampling") == 0) + { + if (defel->arg != NULL) + atparams.zstdSamplingOid = + findTypeZstdSamplingFunction(defGetQualifiedName(defel), + typeOid); + else + atparams.zstdSamplingOid = InvalidOid; /* NONE, remove function */ + + atparams.updateZstdSampling = true; + /* Replacing a canonical function requires superuser. */ + requireSuper = true; + } /* * The rest of the options that CREATE accepts cannot be changed. @@ -4606,6 +4684,11 @@ AlterTypeRecurse(Oid typeOid, bool isImplicitArray, replaces[Anum_pg_type_typsubscript - 1] = true; values[Anum_pg_type_typsubscript - 1] = ObjectIdGetDatum(atparams->subscriptOid); } + if (atparams->updateZstdSampling) + { + replaces[Anum_pg_type_typzstdsampling - 1] = true; + values[Anum_pg_type_typzstdsampling - 1] = ObjectIdGetDatum(atparams->zstdSamplingOid); + } newtup = heap_modify_tuple(tup, RelationGetDescr(catalog), values, nulls, replaces); diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index ae65a1cce06..7cc2d81cf42 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -501,6 +501,7 @@ lookup_type_cache(Oid type_id, int flags) typentry->typelem = typtup->typelem; typentry->typarray = typtup->typarray; typentry->typcollation = typtup->typcollation; + typentry->typzstdsampling = typtup->typzstdsampling; typentry->flags |= TCFLAGS_HAVE_PG_TYPE_DATA; /* If it's a domain, immediately thread it into the domain cache list */ @@ -547,6 +548,7 @@ lookup_type_cache(Oid type_id, int flags) typentry->typelem = typtup->typelem; typentry->typarray = typtup->typarray; typentry->typcollation = typtup->typcollation; + typentry->typzstdsampling = typtup->typzstdsampling; typentry->flags |= TCFLAGS_HAVE_PG_TYPE_DATA; ReleaseSysCache(tp); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 62beb71da28..7d2286850dc 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12566,4 +12566,39 @@ proargnames => '{pid,io_id,io_generation,state,operation,off,length,target,handle_data_len,raw_result,result,target_desc,f_sync,f_localmem,f_buffered}', prosrc => 'pg_get_aios' }, +# ZSTD related functions +{ oid => '9241', descr => 'array typzstdsampling.', + proname => 'array_typzstdsampling', provolatile => 'v', prorettype => 'bool', + proargtypes => 'internal internal', + prosrc => 'array_typzstdsampling' }, + +{ oid => '9242', descr => 'range typzstdsampling.', + proname => 'range_typzstdsampling', provolatile => 'v', prorettype => 'bool', + proargtypes => 'internal internal', + prosrc => 'range_typzstdsampling' }, + +{ oid => '9243', descr => 'multirange typzstdsampling.', + proname => 'multirange_typzstdsampling', provolatile => 'v', prorettype => 'bool', + proargtypes => 'internal internal', + prosrc => 'multirange_typzstdsampling' }, + +{ oid => '9244', descr => 'composite typzstdsampling.', + proname => 'composite_typzstdsampling', provolatile => 'v', prorettype => 'bool', + proargtypes => 'internal internal', + prosrc => 'composite_typzstdsampling' }, + +{ oid => '9245', descr => 'Build zstd dictionaries for a column.', + proname => 'build_zstd_dict_for_attribute', provolatile => 'v', prorettype => 'bool', + proargtypes => 'text int4', proparallel => 'u', + prosrc => 'build_zstd_dict_for_attribute' }, + +{ oid => '9247', descr => 'ZSTD standard sampling for jsonb', + proname => 'std_zstd_sampling_for_jsonb', provolatile => 'v', prorettype => 'bool', + proargtypes => 'internal internal', + prosrc => 'std_zstd_sampling_for_jsonb' }, + +{ oid => '9248', descr => 'ZSTD standard sampling for text', + proname => 'std_zstd_sampling_for_text', provolatile => 'v', prorettype => 'bool', + proargtypes => 'internal internal', + prosrc => 'std_zstd_sampling_for_text' }, ] diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index 6dca77e0a22..a151ba33f82 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -83,7 +83,7 @@ typname => 'text', typlen => '-1', typbyval => 'f', typcategory => 'S', typispreferred => 't', typinput => 'textin', typoutput => 'textout', typreceive => 'textrecv', typsend => 'textsend', typalign => 'i', - typstorage => 'x', typcollation => 'default' }, + typstorage => 'x', typcollation => 'default', typzstdsampling => 'std_zstd_sampling_for_text' }, { oid => '26', array_type_oid => '1028', descr => 'object identifier(oid), maximum 4 billion', typname => 'oid', typlen => '4', typbyval => 't', typcategory => 'N', @@ -446,7 +446,7 @@ typname => 'jsonb', typlen => '-1', typbyval => 'f', typcategory => 'U', typsubscript => 'jsonb_subscript_handler', typinput => 'jsonb_in', typoutput => 'jsonb_out', typreceive => 'jsonb_recv', typsend => 'jsonb_send', - typalign => 'i', typstorage => 'x' }, + typalign => 'i', typstorage => 'x', typzstdsampling => 'std_zstd_sampling_for_jsonb' }, { oid => '4072', array_type_oid => '4073', descr => 'JSON path', typname => 'jsonpath', typlen => '-1', typbyval => 'f', typcategory => 'U', typinput => 'jsonpath_in', typoutput => 'jsonpath_out', diff --git a/src/include/catalog/pg_type.h b/src/include/catalog/pg_type.h index ff666711a54..6f53b79feda 100644 --- a/src/include/catalog/pg_type.h +++ b/src/include/catalog/pg_type.h @@ -227,6 +227,11 @@ CATALOG(pg_type,1247,TypeRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(71,TypeRelati */ Oid typcollation BKI_DEFAULT(0) BKI_LOOKUP_OPT(pg_collation); + /* + * Custom zstd sampling procedure for the datatype. + */ + regproc typzstdsampling BKI_DEFAULT(0) BKI_ARRAY_DEFAULT(array_typzstdsampling) BKI_LOOKUP_OPT(pg_proc); + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* @@ -380,7 +385,8 @@ extern ObjectAddress TypeCreate(Oid newTypeOid, int32 typeMod, int32 typNDims, bool typeNotNull, - Oid typeCollation); + Oid typeCollation, + Oid zstdSamplingProcedure); extern void GenerateTypeDependencies(HeapTuple typeTuple, Relation typeCatalog, diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h index f29ed03b476..69391e40d0c 100644 --- a/src/include/parser/analyze.h +++ b/src/include/parser/analyze.h @@ -17,6 +17,7 @@ #include "nodes/params.h" #include "nodes/queryjumble.h" #include "parser/parse_node.h" +#include "access/htup.h" /* Hook for plugins to get control at end of parse analysis */ typedef void (*post_parse_analyze_hook_type) (ParseState *pstate, @@ -65,4 +66,8 @@ extern List *BuildOnConflictExcludedTargetlist(Relation targetrel, extern SortGroupClause *makeSortGroupClauseForSetOp(Oid rescoltype, bool require_hash); +extern int acquire_sample_rows(Relation onerel, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, double *totaldeadrows); + #endif /* ANALYZE_H */ diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index 1cb30f1818c..c5bf668e519 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -46,6 +46,7 @@ typedef struct TypeCacheEntry Oid typelem; Oid typarray; Oid typcollation; + Oid typzstdsampling; /* * Information obtained from opfamily entries diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be3..09c750d04fa 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -71,6 +71,7 @@ NOTICE: checking pg_type {typmodout} => pg_proc {oid} NOTICE: checking pg_type {typanalyze} => pg_proc {oid} NOTICE: checking pg_type {typbasetype} => pg_type {oid} NOTICE: checking pg_type {typcollation} => pg_collation {oid} +NOTICE: checking pg_type {typzstdsampling} => pg_proc {oid} NOTICE: checking pg_attribute {attrelid} => pg_class {oid} NOTICE: checking pg_attribute {atttypid} => pg_type {oid} NOTICE: checking pg_attribute {attcollation} => pg_collation {oid} diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 86fb79b2076..06a36903a83 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2641,6 +2641,8 @@ STARTUPINFO STRLEN SV SYNCHRONIZATION_BARRIER +SampleCollector +SampleEntry SampleScan SampleScanGetSampleSize_function SampleScanState @@ -3370,6 +3372,7 @@ ZSTD_cParameter ZSTD_inBuffer ZSTD_outBuffer ZstdCompressorState +ZstdTrainingData _SPI_connection _SPI_plan __m128i -- 2.47.1