From bb4d6120cc466a515268ddc80e7229f1b00f0801 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Veldanda Date: Sat, 8 Mar 2025 13:44:38 +0000 Subject: [PATCH v2] Add ZStandard (with dictionaries) compression support for TOAST --- contrib/amcheck/verify_heapam.c | 1 + doc/src/sgml/catalogs.sgml | 55 ++ doc/src/sgml/ref/create_type.sgml | 21 +- src/backend/access/brin/brin_tuple.c | 21 +- src/backend/access/common/detoast.c | 12 +- src/backend/access/common/indextuple.c | 16 +- src/backend/access/common/reloptions.c | 36 +- src/backend/access/common/toast_compression.c | 275 ++++++++- src/backend/access/common/toast_internals.c | 17 +- src/backend/access/table/toast_helper.c | 21 +- src/backend/catalog/Makefile | 3 +- src/backend/catalog/heap.c | 8 +- src/backend/catalog/meson.build | 1 + src/backend/catalog/pg_type.c | 11 +- src/backend/catalog/pg_zstd_dictionaries.c | 566 ++++++++++++++++++ src/backend/commands/analyze.c | 7 +- src/backend/commands/typecmds.c | 99 ++- src/backend/utils/adt/varlena.c | 3 + src/backend/utils/misc/guc_tables.c | 3 + src/backend/utils/misc/postgresql.conf.sample | 2 +- src/bin/pg_dump/pg_dump.c | 25 +- src/bin/psql/describe.c | 5 +- src/include/access/toast_compression.h | 13 +- src/include/access/toast_helper.h | 2 + src/include/access/toast_internals.h | 51 +- src/include/catalog/Makefile | 3 +- src/include/catalog/catversion.h | 2 +- src/include/catalog/meson.build | 1 + src/include/catalog/pg_proc.dat | 10 + src/include/catalog/pg_type.dat | 6 +- src/include/catalog/pg_type.h | 8 +- src/include/catalog/pg_zstd_dictionaries.h | 53 ++ src/include/parser/analyze.h | 5 + src/include/utils/attoptcache.h | 6 + src/include/varatt.h | 67 ++- src/test/regress/expected/compression.out | 5 +- src/test/regress/expected/compression_1.out | 3 + .../regress/expected/compression_zstd.out | 123 ++++ .../regress/expected/compression_zstd_1.out | 181 ++++++ src/test/regress/expected/oidjoins.out | 1 + src/test/regress/parallel_schedule | 2 +- src/test/regress/sql/compression.sql | 1 + src/test/regress/sql/compression_zstd.sql | 97 +++ src/tools/pgindent/typedefs.list | 5 + 44 files changed, 1763 insertions(+), 90 deletions(-) create mode 100644 src/backend/catalog/pg_zstd_dictionaries.c create mode 100644 src/include/catalog/pg_zstd_dictionaries.h create mode 100644 src/test/regress/expected/compression_zstd.out create mode 100644 src/test/regress/expected/compression_zstd_1.out create mode 100644 src/test/regress/sql/compression_zstd.sql diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c index 827312306f..f01cc940e3 100644 --- a/contrib/amcheck/verify_heapam.c +++ b/contrib/amcheck/verify_heapam.c @@ -1700,6 +1700,7 @@ check_tuple_attribute(HeapCheckContext *ctx) /* List of all valid compression method IDs */ case TOAST_PGLZ_COMPRESSION_ID: case TOAST_LZ4_COMPRESSION_ID: + case TOAST_ZSTD_COMPRESSION_ID: valid = true; break; diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index fb05063555..ed4c51a678 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -369,6 +369,12 @@ pg_user_mapping mappings of users to foreign servers + + + pg_zstd_dictionaries + Zstandard dictionaries + + @@ -9779,4 +9785,53 @@ SCRAM-SHA-256$<iteration count>:&l + + + <structname>pg_zstd_dictionaries</structname> + + + pg_zstd_dictionaries + + + + The catalog pg_zstd_dictionaries maintains the dictionaries essential for Zstandard compression and decompression. + + + + <structname>pg_zstd_dictionaries</structname> Columns + + + + + Column Type + Description + + + + + + + + dictid oid + + + Dictionary identifier; a non-null OID that uniquely identifies a dictionary. + + + + + + + dict bytea + + + Variable-length field containing the zstd dictionary data. This field must not be null. + + + + + +
+
+ diff --git a/doc/src/sgml/ref/create_type.sgml b/doc/src/sgml/ref/create_type.sgml index 994dfc6526..ad4cf2f8b3 100644 --- a/doc/src/sgml/ref/create_type.sgml +++ b/doc/src/sgml/ref/create_type.sgml @@ -56,6 +56,7 @@ CREATE TYPE name ( [ , ELEMENT = element ] [ , DELIMITER = delimiter ] [ , COLLATABLE = collatable ] + [ , BUILD_ZSTD_DICT = zstd_training_function ] ) CREATE TYPE name @@ -211,7 +212,8 @@ CREATE TYPE name type_modifier_input_function, type_modifier_output_function, analyze_function, and - subscript_function + subscript_function, and + zstd_training_function are optional. Generally these functions have to be coded in C or another low-level language. @@ -491,6 +493,15 @@ CREATE TYPE name make use of the collation information; this does not happen automatically merely by marking the type collatable. + + + The optional zstd_training_function + performs type-specific sample collection for a column of the corresponding data type. + By default, for jsonb, text, and bytea data types, the function zstd_dictionary_builder is defined. It attempts to gather samples for a column + and returns a sample buffer for zstd dictionary training. The training function must be declared to accept two arguments of type internal and return an internal result. + The detailed information for zstd training function is provided in src/backend/catalog/pg_zstd_dictionaries.c. + + @@ -846,6 +857,14 @@ CREATE TYPE name + + build_zstd_dict + + + Specifies the name of a function that performs sampling and provides the logic necessary to generate a sample buffer for zstd training. + + + diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c index 861f397e6d..02f1996ede 100644 --- a/src/backend/access/brin/brin_tuple.c +++ b/src/backend/access/brin/brin_tuple.c @@ -40,7 +40,7 @@ #include "access/tupmacs.h" #include "utils/datum.h" #include "utils/memutils.h" - +#include "utils/attoptcache.h" /* * This enables de-toasting of index entries. Needed until VACUUM is @@ -222,7 +222,8 @@ brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple, atttype->typstorage == TYPSTORAGE_MAIN)) { Datum cvalue; - char compression; + CompressionInfo cmp = {.cmethod = InvalidCompressionMethod,.dictid = InvalidDictId,.zstd_level = DEFAULT_ZSTD_LEVEL}; + Form_pg_attribute att = TupleDescAttr(brdesc->bd_tupdesc, keyno); @@ -233,11 +234,19 @@ brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple, * default method. */ if (att->atttypid == atttype->type_id) - compression = att->attcompression; - else - compression = InvalidCompressionMethod; + cmp.cmethod = att->attcompression; - cvalue = toast_compress_datum(value, compression); + if (cmp.cmethod == TOAST_ZSTD_COMPRESSION) + { + AttributeOpts *aopt = get_attribute_options(att->attrelid, att->attnum); + + if (aopt != NULL) + { + cmp.zstd_level = aopt->zstd_level; + cmp.dictid = (Oid) aopt->dictid; + } + } + cvalue = toast_compress_datum(value, cmp); if (DatumGetPointer(cvalue) != NULL) { diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c index 6265178774..b57a9f024c 100644 --- a/src/backend/access/common/detoast.c +++ b/src/backend/access/common/detoast.c @@ -246,10 +246,10 @@ detoast_attr_slice(struct varlena *attr, * Determine maximum amount of compressed data needed for a prefix * of a given length (after decompression). * - * At least for now, if it's LZ4 data, we'll have to fetch the - * whole thing, because there doesn't seem to be an API call to - * determine how much compressed data we need to be sure of being - * able to decompress the required slice. + * At least for now, if it's LZ4 or Zstandard data, we'll have to + * fetch the whole thing, because there doesn't seem to be an API + * call to determine how much compressed data we need to be sure + * of being able to decompress the required slice. */ if (VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer) == TOAST_PGLZ_COMPRESSION_ID) @@ -485,6 +485,8 @@ toast_decompress_datum(struct varlena *attr) return pglz_decompress_datum(attr); case TOAST_LZ4_COMPRESSION_ID: return lz4_decompress_datum(attr); + case TOAST_ZSTD_COMPRESSION_ID: + return zstd_decompress_datum(attr); default: elog(ERROR, "invalid compression method id %d", cmid); return NULL; /* keep compiler quiet */ @@ -528,6 +530,8 @@ toast_decompress_datum_slice(struct varlena *attr, int32 slicelength) return pglz_decompress_datum_slice(attr, slicelength); case TOAST_LZ4_COMPRESSION_ID: return lz4_decompress_datum_slice(attr, slicelength); + case TOAST_ZSTD_COMPRESSION_ID: + return zstd_decompress_datum_slice(attr, slicelength); default: elog(ERROR, "invalid compression method id %d", cmid); return NULL; /* keep compiler quiet */ diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c index 1986b943a2..9cf5aabf51 100644 --- a/src/backend/access/common/indextuple.c +++ b/src/backend/access/common/indextuple.c @@ -21,6 +21,7 @@ #include "access/htup_details.h" #include "access/itup.h" #include "access/toast_internals.h" +#include "utils/attoptcache.h" /* * This enables de-toasting of index entries. Needed until VACUUM is @@ -124,8 +125,19 @@ index_form_tuple_context(TupleDesc tupleDescriptor, { Datum cvalue; - cvalue = toast_compress_datum(untoasted_values[i], - att->attcompression); + CompressionInfo cmp = {.cmethod = att->attcompression,.dictid = InvalidDictId,.zstd_level = DEFAULT_ZSTD_LEVEL}; + + if (cmp.cmethod == TOAST_ZSTD_COMPRESSION) + { + AttributeOpts *aopt = get_attribute_options(att->attrelid, att->attnum); + + if (aopt != NULL) + { + cmp.zstd_level = aopt->zstd_level; + cmp.dictid = (Oid) aopt->dictid; + } + } + cvalue = toast_compress_datum(untoasted_values[i], cmp); if (DatumGetPointer(cvalue) != NULL) { diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 59fb53e770..7c71fb3492 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -34,6 +34,7 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "access/toast_compression.h" /* * Contents of pg_class.reloptions @@ -389,7 +390,26 @@ static relopt_int intRelOpts[] = }, -1, 0, 1024 }, - + { + { + "zstd_dict_size", + "Max dict size for zstd", + RELOPT_KIND_ATTRIBUTE, + ShareUpdateExclusiveLock + }, + DEFAULT_ZSTD_DICT_SIZE, 0, 112640 /* Max dict size(110 KB), 0 + * indicates Don't use dictionary + * for compression */ + }, + { + { + "zstd_level", + "Set column's ZSTD compression level", + RELOPT_KIND_ATTRIBUTE, + ShareUpdateExclusiveLock + }, + DEFAULT_ZSTD_LEVEL, 1, 22 + }, /* list terminator */ {{NULL}} }; @@ -478,6 +498,15 @@ static relopt_real realRelOpts[] = }, 0, -1.0, DBL_MAX }, + { + { + "dictid", + "Current dictid for column", + RELOPT_KIND_ATTRIBUTE, + ShareUpdateExclusiveLock + }, + InvalidDictId, InvalidDictId, UINT32_MAX + }, { { "vacuum_cleanup_index_scale_factor", @@ -2093,7 +2122,10 @@ attribute_reloptions(Datum reloptions, bool validate) { static const relopt_parse_elt tab[] = { {"n_distinct", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct)}, - {"n_distinct_inherited", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct_inherited)} + {"n_distinct_inherited", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct_inherited)}, + {"dictid", RELOPT_TYPE_REAL, offsetof(AttributeOpts, dictid)}, + {"zstd_dict_size", RELOPT_TYPE_INT, offsetof(AttributeOpts, zstd_dict_size)}, + {"zstd_level", RELOPT_TYPE_INT, offsetof(AttributeOpts, zstd_level)}, }; return (bytea *) build_reloptions(reloptions, validate, diff --git a/src/backend/access/common/toast_compression.c b/src/backend/access/common/toast_compression.c index 21f2f4af97..2a271b8bcd 100644 --- a/src/backend/access/common/toast_compression.c +++ b/src/backend/access/common/toast_compression.c @@ -17,19 +17,26 @@ #include #endif +#ifdef USE_ZSTD +#include +#include +#endif + #include "access/detoast.h" #include "access/toast_compression.h" #include "common/pg_lzcompress.h" #include "varatt.h" +#include "catalog/pg_zstd_dictionaries.h" +#include "access/toast_internals.h" /* GUC */ int default_toast_compression = TOAST_PGLZ_COMPRESSION; -#define NO_LZ4_SUPPORT() \ +#define NO_METHOD_SUPPORT(method) \ ereport(ERROR, \ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ - errmsg("compression method lz4 not supported"), \ - errdetail("This functionality requires the server to be built with lz4 support."))) + errmsg("compression method %s not supported", method), \ + errdetail("This functionality requires the server to be built with %s support.", method))) /* * Compress a varlena using PGLZ. @@ -139,7 +146,7 @@ struct varlena * lz4_compress_datum(const struct varlena *value) { #ifndef USE_LZ4 - NO_LZ4_SUPPORT(); + NO_METHOD_SUPPORT("lz4"); return NULL; /* keep compiler quiet */ #else int32 valsize; @@ -182,7 +189,7 @@ struct varlena * lz4_decompress_datum(const struct varlena *value) { #ifndef USE_LZ4 - NO_LZ4_SUPPORT(); + NO_METHOD_SUPPORT("lz4"); return NULL; /* keep compiler quiet */ #else int32 rawsize; @@ -215,7 +222,7 @@ struct varlena * lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength) { #ifndef USE_LZ4 - NO_LZ4_SUPPORT(); + NO_METHOD_SUPPORT("lz4"); return NULL; /* keep compiler quiet */ #else int32 rawsize; @@ -266,7 +273,13 @@ toast_get_compression_id(struct varlena *attr) VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); - if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) && VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer) >= TOAST_ZSTD_COMPRESSION_ID) + { + struct varlena *compressed_attr = detoast_external_attr(attr); + + cmid = TOAST_COMPRESS_METHOD(compressed_attr); + } + else cmid = VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer); } else if (VARATT_IS_COMPRESSED(attr)) @@ -289,10 +302,17 @@ CompressionNameToMethod(const char *compression) else if (strcmp(compression, "lz4") == 0) { #ifndef USE_LZ4 - NO_LZ4_SUPPORT(); + NO_METHOD_SUPPORT("lz4"); #endif return TOAST_LZ4_COMPRESSION; } + else if (strcmp(compression, "zstd") == 0) + { +#ifndef USE_ZSTD + NO_METHOD_SUPPORT("zstd"); +#endif + return TOAST_ZSTD_COMPRESSION; + } return InvalidCompressionMethod; } @@ -309,8 +329,247 @@ GetCompressionMethodName(char method) return "pglz"; case TOAST_LZ4_COMPRESSION: return "lz4"; + case TOAST_ZSTD_COMPRESSION: + return "zstd"; default: elog(ERROR, "invalid compression method %c", method); return NULL; /* keep compiler quiet */ } } + +/* Compress datum using ZSTD with optional dictionary (using cdict) */ +struct varlena * +zstd_compress_datum(const struct varlena *value, Oid dictid, int zstd_level) +{ +#ifdef USE_ZSTD + uint32 valsize = VARSIZE_ANY_EXHDR(value); + size_t max_size = ZSTD_compressBound(valsize); + struct varlena *compressed; + void *dest; + size_t cmp_size, + ret; + ZSTD_CCtx *cctx = ZSTD_createCCtx(); + ZSTD_CDict *cdict = NULL; + + if (!cctx) + ereport(ERROR, (errmsg("Failed to create ZSTD compression context"))); + + ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, zstd_level); + if (ZSTD_isError(ret)) + { + ZSTD_freeCCtx(cctx); + ereport(ERROR, (errmsg("Failed to reference ZSTD compression level: %s", ZSTD_getErrorName(ret)))); + } + + if (dictid != InvalidDictId) + { + bytea *dict_bytea = get_zstd_dict(dictid); + const void *dict_buffer = VARDATA_ANY(dict_bytea); + uint32 dict_size = VARSIZE_ANY(dict_bytea) - VARHDRSZ; + + cdict = ZSTD_createCDict(dict_buffer, dict_size, zstd_level); + pfree(dict_bytea); + + if (!cdict) + { + ZSTD_freeCCtx(cctx); + ereport(ERROR, (errmsg("Failed to create ZSTD compression dictionary"))); + } + + ret = ZSTD_CCtx_refCDict(cctx, cdict); + if (ZSTD_isError(ret)) + { + ZSTD_freeCDict(cdict); + ZSTD_freeCCtx(cctx); + ereport(ERROR, (errmsg("Failed to reference ZSTD dictionary: %s", ZSTD_getErrorName(ret)))); + } + } + + /* Allocate space for the compressed varlena (header + data) */ + compressed = (struct varlena *) palloc(max_size + VARHDRSZ_COMPRESSED_EXT); + dest = (char *) compressed + VARHDRSZ_COMPRESSED_EXT; + + /* Compress the data */ + cmp_size = ZSTD_compress2(cctx, dest, max_size, VARDATA_ANY(value), valsize); + + /* Cleanup */ + ZSTD_freeCDict(cdict); + ZSTD_freeCCtx(cctx); + + if (ZSTD_isError(cmp_size)) + { + pfree(compressed); + ereport(ERROR, (errmsg("ZSTD compression failed: %s", ZSTD_getErrorName(cmp_size)))); + } + + /* + * If compression did not reduce size, return NULL so that the + * uncompressed data is stored + */ + if (cmp_size > valsize) + { + pfree(compressed); + return NULL; + } + + /* Set the compressed size in the varlena header */ + SET_VARSIZE_COMPRESSED(compressed, cmp_size + VARHDRSZ_COMPRESSED_EXT); + return compressed; + +#else + NO_METHOD_SUPPORT("zstd"); + return NULL; +#endif +} + +struct varlena * +zstd_decompress_datum(const struct varlena *value) +{ +#ifdef USE_ZSTD + uint32 actual_size_exhdr = VARDATA_COMPRESSED_GET_EXTSIZE(value); + uint32 cmp_size_exhdr = VARSIZE_4B(value) - VARHDRSZ_COMPRESSED_EXT; + Oid dictid; + struct varlena *result; + size_t uncmp_size, + ret; + ZSTD_DCtx *dctx = ZSTD_createDCtx(); + ZSTD_DDict *ddict = NULL; + + if (!dctx) + ereport(ERROR, (errmsg("Failed to create ZSTD decompression context"))); + + + dictid = (Oid) VARDATA_COMPRESSED_GET_DICTID(value); + + if (dictid != InvalidDictId) + { + bytea *dict_bytea = get_zstd_dict(dictid); + const void *dict_buffer = VARDATA_ANY(dict_bytea); + uint32 dict_size = VARSIZE_ANY(dict_bytea) - VARHDRSZ; + + ddict = ZSTD_createDDict(dict_buffer, dict_size); + pfree(dict_bytea); + + if (!ddict) + { + ZSTD_freeDCtx(dctx); + ereport(ERROR, (errmsg("Failed to create ZSTD compression dictionary"))); + } + + ret = ZSTD_DCtx_refDDict(dctx, ddict); + if (ZSTD_isError(ret)) + { + ZSTD_freeDDict(ddict); + ZSTD_freeDCtx(dctx); + ereport(ERROR, (errmsg("Failed to reference ZSTD dictionary: %s", ZSTD_getErrorName(ret)))); + } + } + + /* Allocate space for the uncompressed data */ + result = (struct varlena *) palloc(actual_size_exhdr + VARHDRSZ); + + uncmp_size = ZSTD_decompressDCtx(dctx, + VARDATA(result), + actual_size_exhdr, + VARDATA_4B_C(value), + cmp_size_exhdr); + + /* Cleanup */ + ZSTD_freeDDict(ddict); + ZSTD_freeDCtx(dctx); + + if (ZSTD_isError(uncmp_size)) + { + pfree(result); + ereport(ERROR, (errmsg("ZSTD decompression failed: %s", ZSTD_getErrorName(uncmp_size)))); + } + + /* Set final size in the varlena header */ + SET_VARSIZE(result, uncmp_size + VARHDRSZ); + return result; + +#else + NO_METHOD_SUPPORT("zstd"); + return NULL; +#endif +} + +/* Decompress a slice of the datum using the streaming API and optional dictionary */ +struct varlena * +zstd_decompress_datum_slice(const struct varlena *value, int32 slicelength) +{ +#ifdef USE_ZSTD + struct varlena *result; + ZSTD_inBuffer inBuf; + ZSTD_outBuffer outBuf; + ZSTD_DCtx *dctx = ZSTD_createDCtx(); + ZSTD_DDict *ddict = NULL; + Oid dictid; + uint32 cmp_size_exhdr = VARSIZE_4B(value) - VARHDRSZ_COMPRESSED_EXT; + size_t ret; + + if (dctx == NULL) + elog(ERROR, "could not create zstd decompression context"); + + /* Extract the dictionary ID from the compressed frame */ + dictid = (Oid) ZSTD_getDictID_fromFrame(VARDATA_4B_C(value), cmp_size_exhdr); + + if (dictid != InvalidDictId) + { + bytea *dict_bytea = get_zstd_dict(dictid); + const void *dict_buffer = VARDATA_ANY(dict_bytea); + uint32 dict_size = VARSIZE_ANY(dict_bytea) - VARHDRSZ; + + /* Create and bind the dictionary to the decompression context */ + ddict = ZSTD_createDDict(dict_buffer, dict_size); + pfree(dict_bytea); + + if (!ddict) + { + ZSTD_freeDCtx(dctx); + ereport(ERROR, (errmsg("Failed to create ZSTD compression dictionary"))); + } + + ret = ZSTD_DCtx_refDDict(dctx, ddict); + if (ZSTD_isError(ret)) + { + ZSTD_freeDDict(ddict); + ZSTD_freeDCtx(dctx); + ereport(ERROR, (errmsg("Failed to reference ZSTD dictionary: %s", ZSTD_getErrorName(ret)))); + } + } + + inBuf.src = (char *) value + VARHDRSZ_COMPRESSED_EXT; + inBuf.size = VARSIZE(value) - VARHDRSZ_COMPRESSED_EXT; + inBuf.pos = 0; + + result = (struct varlena *) palloc(slicelength + VARHDRSZ); + outBuf.dst = (char *) result + VARHDRSZ; + outBuf.size = slicelength; + outBuf.pos = 0; + + /* Common decompression loop */ + while (inBuf.pos < inBuf.size && outBuf.pos < outBuf.size) + { + ret = ZSTD_decompressStream(dctx, &outBuf, &inBuf); + if (ZSTD_isError(ret)) + { + pfree(result); + ZSTD_freeDDict(ddict); + ZSTD_freeDCtx(dctx); + elog(ERROR, "zstd decompression failed: %s", ZSTD_getErrorName(ret)); + } + } + + /* Cleanup */ + ZSTD_freeDDict(ddict); + ZSTD_freeDCtx(dctx); + + Assert(outBuf.size == slicelength && outBuf.pos == slicelength); + SET_VARSIZE(result, outBuf.pos + VARHDRSZ); + return result; +#else + NO_METHOD_SUPPORT("zstd"); + return NULL; +#endif +} diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 7d8be8346c..e6c877fd0a 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -43,11 +43,12 @@ static bool toastid_valueid_exists(Oid toastrelid, Oid valueid); * ---------- */ Datum -toast_compress_datum(Datum value, char cmethod) +toast_compress_datum(Datum value, CompressionInfo cmp) { struct varlena *tmp = NULL; int32 valsize; ToastCompressionId cmid = TOAST_INVALID_COMPRESSION_ID; + uint32 dictid = cmp.dictid; Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value))); Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value))); @@ -55,13 +56,13 @@ toast_compress_datum(Datum value, char cmethod) valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value)); /* If the compression method is not valid, use the current default */ - if (!CompressionMethodIsValid(cmethod)) - cmethod = default_toast_compression; + if (!CompressionMethodIsValid(cmp.cmethod)) + cmp.cmethod = default_toast_compression; /* * Call appropriate compression routine for the compression method. */ - switch (cmethod) + switch (cmp.cmethod) { case TOAST_PGLZ_COMPRESSION: tmp = pglz_compress_datum((const struct varlena *) value); @@ -71,8 +72,12 @@ toast_compress_datum(Datum value, char cmethod) tmp = lz4_compress_datum((const struct varlena *) value); cmid = TOAST_LZ4_COMPRESSION_ID; break; + case TOAST_ZSTD_COMPRESSION: + tmp = zstd_compress_datum((const struct varlena *) value, cmp.dictid, cmp.zstd_level); + cmid = TOAST_ZSTD_COMPRESSION_ID; + break; default: - elog(ERROR, "invalid compression method %c", cmethod); + elog(ERROR, "invalid compression method %c", cmp.cmethod); } if (tmp == NULL) @@ -92,7 +97,7 @@ toast_compress_datum(Datum value, char cmethod) { /* successful compression */ Assert(cmid != TOAST_INVALID_COMPRESSION_ID); - TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(tmp, valsize, cmid); + TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(tmp, valsize, cmid, dictid); return PointerGetDatum(tmp); } else diff --git a/src/backend/access/table/toast_helper.c b/src/backend/access/table/toast_helper.c index b60fab0a4d..968dd9f7c0 100644 --- a/src/backend/access/table/toast_helper.c +++ b/src/backend/access/table/toast_helper.c @@ -19,7 +19,8 @@ #include "access/toast_internals.h" #include "catalog/pg_type_d.h" #include "varatt.h" - +#include "utils/attoptcache.h" +#include "access/toast_compression.h" /* * Prepare to TOAST a tuple. @@ -55,6 +56,18 @@ toast_tuple_init(ToastTupleContext *ttc) ttc->ttc_attr[i].tai_colflags = 0; ttc->ttc_attr[i].tai_oldexternal = NULL; ttc->ttc_attr[i].tai_compression = att->attcompression; + ttc->ttc_attr[i].dictid = InvalidDictId; + ttc->ttc_attr[i].zstd_level = DEFAULT_ZSTD_LEVEL; + if (att->attcompression == TOAST_ZSTD_COMPRESSION) + { + AttributeOpts *aopt = get_attribute_options(att->attrelid, att->attnum); + + if (aopt) + { + ttc->ttc_attr[i].dictid = (Oid) aopt->dictid; + ttc->ttc_attr[i].zstd_level = aopt->zstd_level; + } + } if (ttc->ttc_oldvalues != NULL) { @@ -230,7 +243,11 @@ toast_tuple_try_compression(ToastTupleContext *ttc, int attribute) Datum new_value; ToastAttrInfo *attr = &ttc->ttc_attr[attribute]; - new_value = toast_compress_datum(*value, attr->tai_compression); + CompressionInfo cmp = {.cmethod = attr->tai_compression, + .dictid = attr->dictid, + .zstd_level = attr->zstd_level}; + + new_value = toast_compress_datum(*value, cmp); if (DatumGetPointer(new_value) != NULL) { diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index c090094ed0..282afbcef5 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -46,7 +46,8 @@ OBJS = \ pg_subscription.o \ pg_type.o \ storage.o \ - toasting.o + toasting.o \ + pg_zstd_dictionaries.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index bd3554c0bf..493963b1b8 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1071,7 +1071,9 @@ AddNewRelationType(const char *typeName, -1, /* typmod */ 0, /* array dimensions for typBaseType */ false, /* Type NOT NULL */ - InvalidOid); /* rowtypes never have a collation */ + InvalidOid, /* rowtypes never have a collation */ + InvalidOid /* generate dictionary procedure - default */ + ); } /* -------------------------------- @@ -1394,7 +1396,9 @@ heap_create_with_catalog(const char *relname, -1, /* typmod */ 0, /* array dimensions for typBaseType */ false, /* Type NOT NULL */ - InvalidOid); /* rowtypes never have a collation */ + InvalidOid, /* rowtypes never have a collation */ + InvalidOid /* generate dictionary procedure - default */ + ); pfree(relarrayname); } diff --git a/src/backend/catalog/meson.build b/src/backend/catalog/meson.build index 1958ea9238..8f0413189c 100644 --- a/src/backend/catalog/meson.build +++ b/src/backend/catalog/meson.build @@ -34,6 +34,7 @@ backend_sources += files( 'pg_type.c', 'storage.c', 'toasting.c', + 'pg_zstd_dictionaries.c', ) diff --git a/src/backend/catalog/pg_type.c b/src/backend/catalog/pg_type.c index b36f81afb9..bbed8f64ad 100644 --- a/src/backend/catalog/pg_type.c +++ b/src/backend/catalog/pg_type.c @@ -120,6 +120,7 @@ TypeShellMake(const char *typeName, Oid typeNamespace, Oid ownerId) values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(-1); values[Anum_pg_type_typndims - 1] = Int32GetDatum(0); values[Anum_pg_type_typcollation - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_type_typebuildzstddictionary - 1] = ObjectIdGetDatum(InvalidOid); nulls[Anum_pg_type_typdefaultbin - 1] = true; nulls[Anum_pg_type_typdefault - 1] = true; nulls[Anum_pg_type_typacl - 1] = true; @@ -223,7 +224,8 @@ TypeCreate(Oid newTypeOid, int32 typeMod, int32 typNDims, /* Array dimensions for baseType */ bool typeNotNull, - Oid typeCollation) + Oid typeCollation, + Oid generateDictionaryProcedure) { Relation pg_type_desc; Oid typeObjectId; @@ -378,6 +380,7 @@ TypeCreate(Oid newTypeOid, values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(typeMod); values[Anum_pg_type_typndims - 1] = Int32GetDatum(typNDims); values[Anum_pg_type_typcollation - 1] = ObjectIdGetDatum(typeCollation); + values[Anum_pg_type_typebuildzstddictionary - 1] = ObjectIdGetDatum(generateDictionaryProcedure); /* * initialize the default binary value for this type. Check for nulls of @@ -679,6 +682,12 @@ GenerateTypeDependencies(HeapTuple typeTuple, add_exact_object_address(&referenced, addrs_normal); } + if (OidIsValid(typeForm->typebuildzstddictionary)) + { + ObjectAddressSet(referenced, ProcedureRelationId, typeForm->typebuildzstddictionary); + add_exact_object_address(&referenced, addrs_normal); + } + if (OidIsValid(typeForm->typsubscript)) { ObjectAddressSet(referenced, ProcedureRelationId, typeForm->typsubscript); diff --git a/src/backend/catalog/pg_zstd_dictionaries.c b/src/backend/catalog/pg_zstd_dictionaries.c new file mode 100644 index 0000000000..2d27f58221 --- /dev/null +++ b/src/backend/catalog/pg_zstd_dictionaries.c @@ -0,0 +1,566 @@ +#include "postgres.h" + +#include "fmgr.h" +#include "access/heapam.h" +#include "access/table.h" +#include "access/relation.h" +#include "access/tableam.h" +#include "catalog/catalog.h" +#include "catalog/dependency.h" +#include "catalog/indexing.h" +#include "catalog/pg_class_d.h" +#include "catalog/pg_zstd_dictionaries.h" +#include "catalog/pg_zstd_dictionaries_d.h" +#include "catalog/pg_type.h" +#include "catalog/namespace.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" +#include "utils/syscache.h" +#include "utils/hsearch.h" +#include "access/toast_compression.h" +#include "utils/attoptcache.h" +#include "parser/analyze.h" +#include "common/hashfn.h" +#include "nodes/makefuncs.h" +#include "access/reloptions.h" +#include "miscadmin.h" +#include "access/genam.h" +#include "executor/tuptable.h" +#include "access/htup_details.h" +#include "access/sdir.h" +#include "utils/lsyscache.h" +#include "utils/relcache.h" +#include "utils/memutils.h" +#include "utils/varlena.h" +#include "nodes/pg_list.h" + +#ifdef USE_ZSTD +#include +#include +#endif + +#define TARG_ROWS 1000 + +typedef struct SampleEntry SampleEntry; +typedef struct SampleCollector SampleCollector; + +/* Structure to store a sample entry */ +struct SampleEntry +{ + void *data; /* Pointer to sample data */ + size_t size; /* Size of the sample */ +}; + +/* Structure to collect samples along with a hash table for deduplication */ +struct SampleCollector +{ + SampleEntry *samples; /* Dynamic array of pointers to SampleEntry */ + int sample_count; /* Number of collected samples */ +}; + +static bool build_zstd_dictionary_internal(Oid relid, AttrNumber attno); +static Oid GetNewDictId(Relation relation, Oid indexId, AttrNumber dictIdColumn); + +/* ---------------------------------------------------------------- + * Zstandard dictionary training related methods + * ---------------------------------------------------------------- + */ + +/* + * build_zstd_dictionary_internal + * 1) Validate that the given (relid, attno) can have a Zstd compression enabled on heap relation + * 2) Call the type-specific dictionary builder + * 3) Train a dictionary via ZDICT_trainFromBuffer() + * 4) Insert dictionary into pg_zstd_dictionaries + * 5) Update pg_attribute.attoptions with dictid + */ +pg_attribute_unused() +static bool +build_zstd_dictionary_internal(Oid relid, AttrNumber attno) +{ +#ifdef USE_ZSTD + Relation catalogRel; + TupleDesc catTupDesc; + Oid dictid; + Relation rel; + TupleDesc tupleDesc; + Form_pg_attribute att; + AttributeOpts *attopt; + HeapTuple typeTup; + Form_pg_type typeForm; + Oid baseTypeOid; + Oid train_func; + Datum dictDatum; + ZstdTrainingData *dict; + char *samples_buffer; + size_t *sample_sizes; + size_t nitems; + uint32 dictionary_size; + void *dict_data; + size_t dict_size; + + /* ---- + * 1) Open user relation just to verify it's a normal table and has Zstd compression + * ---- + */ + rel = table_open(relid, AccessShareLock); + if (rel->rd_rel->relkind != RELKIND_RELATION) + { + table_close(rel, AccessShareLock); + return false; /* not a regular table */ + } + + /* If the column doesn't use Zstd, nothing to do */ + tupleDesc = RelationGetDescr(rel); + att = TupleDescAttr(tupleDesc, attno - 1); + if (att->attcompression != TOAST_ZSTD_COMPRESSION) + { + table_close(rel, AccessShareLock); + return false; + } + + /* Check attoptions for user-requested dictionary size, etc. */ + attopt = get_attribute_options(relid, attno); + if (attopt && attopt->zstd_dict_size == 0) + { + /* user explicitly says "no dictionary needed" */ + table_close(rel, AccessShareLock); + return false; + } + + /* + * 2) Look up the type's custom dictionary builder function We'll call it + * to get sample data. Then we can close 'rel' because we don't need it + * open to do the actual Zdict training. + */ + typeTup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); + if (!HeapTupleIsValid(typeTup)) + { + table_close(rel, AccessShareLock); + elog(ERROR, "cache lookup failed for type %u", att->atttypid); + } + typeForm = (Form_pg_type) GETSTRUCT(typeTup); + + if (typeForm->typlen != -1) + { + ReleaseSysCache(typeTup); + table_close(rel, AccessShareLock); + return false; + } + + /* Get the base type */ + baseTypeOid = get_element_type(typeForm->oid); + train_func = InvalidOid; + + if (OidIsValid(baseTypeOid)) + { + HeapTuple baseTypeTup; + Form_pg_type baseTypeForm; + + /* It's an array type: get the base type's training function */ + baseTypeTup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(baseTypeOid)); + if (!HeapTupleIsValid(baseTypeTup)) + ereport(ERROR, + (errmsg("Cache lookup failed for base type %u", baseTypeOid))); + + baseTypeForm = (Form_pg_type) GETSTRUCT(baseTypeTup); + train_func = baseTypeForm->typebuildzstddictionary; + ReleaseSysCache(baseTypeTup); + } + else + train_func = typeForm->typebuildzstddictionary; + + /* If the type does not supply a builder, skip */ + if (!OidIsValid(train_func)) + { + ReleaseSysCache(typeTup); + table_close(rel, AccessShareLock); + return false; + } + + /* Call the type-specific builder. It should return ZstdTrainingData */ + dictDatum = OidFunctionCall2(train_func, + PointerGetDatum(rel), /* pass relation ref */ + PointerGetDatum(att)); + ReleaseSysCache(typeTup); + + /* We no longer need the user relation open */ + table_close(rel, AccessShareLock); + + dict = (ZstdTrainingData *) DatumGetPointer(dictDatum); + if (!dict || dict->nitems == 0) + return false; + + /* + * 3) Train a Zstd dictionary in-memory. + */ + samples_buffer = dict->sample_buffer; + sample_sizes = dict->sample_sizes; + nitems = dict->nitems; + + dictionary_size = (!attopt ? DEFAULT_ZSTD_DICT_SIZE + : attopt->zstd_dict_size); + + /* Allocate buffer for dictionary training result */ + dict_data = palloc(dictionary_size); + dict_size = ZDICT_trainFromBuffer(dict_data, + dictionary_size, + samples_buffer, + sample_sizes, + nitems); + if (ZDICT_isError(dict_size)) + { + elog(LOG, "Zstd dictionary training failed: %s", + ZDICT_getErrorName(dict_size)); + pfree(dict_data); + return false; + } + + /* Open the catalog relation with ShareRowExclusiveLock */ + catalogRel = table_open(ZstdDictionariesRelationId, ShareRowExclusiveLock); + catTupDesc = RelationGetDescr(catalogRel); + dictid = GetNewDictId(catalogRel, ZstdDictidIndexId, Anum_pg_zstd_dictionaries_dictid); + + /* Now copy that finalized dictionary into a bytea. */ + { + /* We’ll store this bytea in pg_zstd_dictionaries. */ + Datum values[Natts_pg_zstd_dictionaries]; + bool nulls[Natts_pg_zstd_dictionaries]; + HeapTuple tup; + + bytea *dict_bytea = (bytea *) palloc(VARHDRSZ + dict_size); + + SET_VARSIZE(dict_bytea, VARHDRSZ + dict_size); + memcpy(VARDATA(dict_bytea), dict_data, dict_size); + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, false, sizeof(nulls)); + + values[Anum_pg_zstd_dictionaries_dictid - 1] = ObjectIdGetDatum(dictid); + values[Anum_pg_zstd_dictionaries_dict - 1] = PointerGetDatum(dict_bytea); + + tup = heap_form_tuple(catTupDesc, values, nulls); + CatalogTupleInsert(catalogRel, tup); + heap_freetuple(tup); + + pfree(dict_bytea); + } + + pfree(dict_data); + pfree(samples_buffer); + pfree(sample_sizes); + pfree(dict); + + /* + * 5) Update pg_attribute.attoptions with "dictid" => dictid so the column + * knows which dictionary to use at compression time. + */ + { + Relation attRel = table_open(AttributeRelationId, RowExclusiveLock); + HeapTuple atttup, + newtuple; + Datum attoptionsDatum, + newOptions; + bool isnull; + Datum repl_val[Natts_pg_attribute]; + bool repl_null[Natts_pg_attribute]; + bool repl_repl[Natts_pg_attribute]; + DefElem *def; + + atttup = SearchSysCacheAttNum(relid, attno); + if (!HeapTupleIsValid(atttup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column number %d of relation \"%u\" does not exist", + attno, relid))); + + /* Build new attoptions with dictid=... */ + def = makeDefElem("dictid", + (Node *) makeString(psprintf("%u", dictid)), + -1); + + attoptionsDatum = SysCacheGetAttr(ATTNUM, atttup, + Anum_pg_attribute_attoptions, + &isnull); + newOptions = transformRelOptions(isnull ? (Datum) 0 : attoptionsDatum, + list_make1(def), + NULL, NULL, + false, false); + /* Validate them (throws error if invalid) */ + (void) attribute_reloptions(newOptions, true); + + MemSet(repl_null, false, sizeof(repl_null)); + MemSet(repl_repl, false, sizeof(repl_repl)); + + if (newOptions != (Datum) 0) + repl_val[Anum_pg_attribute_attoptions - 1] = newOptions; + else + repl_null[Anum_pg_attribute_attoptions - 1] = true; + + repl_repl[Anum_pg_attribute_attoptions - 1] = true; + + newtuple = heap_modify_tuple(atttup, + RelationGetDescr(attRel), + repl_val, + repl_null, + repl_repl); + + CatalogTupleUpdate(attRel, &newtuple->t_self, newtuple); + heap_freetuple(newtuple); + + ReleaseSysCache(atttup); + + table_close(attRel, NoLock); + } + + /** + * Done inserting dictionary and updating attribute. + * Unlock the table (locks remain held until transaction commit) + */ + table_close(catalogRel, NoLock); + + return true; +#else + return false; +#endif +} + +/* + * Acquire a new unique DictId for a relation. + * + * Assumes the relation is already locked with ShareRowExclusiveLock, + * ensuring that concurrent transactions cannot generate duplicate DictIds. + */ +pg_attribute_unused() +static Oid +GetNewDictId(Relation relation, Oid indexId, AttrNumber dictIdColumn) +{ + Relation indexRel = index_open(indexId, AccessShareLock); + Oid maxDictId = InvalidDictId; + SysScanDesc scan; + HeapTuple tuple; + bool collision; + ScanKeyData key; + Oid newDictId; + + /* Retrieve the maximum existing DictId by scanning in reverse order */ + scan = systable_beginscan_ordered(relation, indexRel, SnapshotAny, 0, NULL); + tuple = systable_getnext_ordered(scan, BackwardScanDirection); + if (HeapTupleIsValid(tuple)) + { + Datum value; + bool isNull; + + value = heap_getattr(tuple, dictIdColumn, RelationGetDescr(relation), &isNull); + if (!isNull) + maxDictId = DatumGetObjectId(value); + } + systable_endscan(scan); + + newDictId = maxDictId + 1; + Assert(newDictId != InvalidDictId); + + /* Check that the new DictId is indeed unique */ + ScanKeyInit(&key, + dictIdColumn, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(newDictId)); + + scan = systable_beginscan(relation, indexRel->rd_id, true, + SnapshotAny, 1, &key); + collision = HeapTupleIsValid(systable_getnext(scan)); + systable_endscan(scan); + + if (collision) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("unexpected collision for new DictId %d", newDictId))); + + return newDictId; +} + +/* + * get_zstd_dict - Fetches the ZSTD dictionary from the catalog + * + * dictid: The Oid of the dictionary to fetch. + * + * Returns: A pointer to a bytea containing the dictionary data. + */ +bytea * +get_zstd_dict(Oid dictid) +{ + HeapTuple tuple; + Datum datum; + bool isNull; + bytea *dict_bytea; + bytea *result; + Size bytea_len; + + /* Fetch the dictionary tuple from the syscache */ + tuple = SearchSysCache1(ZSTDDICTIDOID, ObjectIdGetDatum(dictid)); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, (errmsg("Cache lookup failed for dictid %u", dictid))); + + /* Get the dictionary attribute from the tuple */ + datum = SysCacheGetAttr(ATTNUM, tuple, Anum_pg_zstd_dictionaries_dict, &isNull); + if (isNull) + ereport(ERROR, (errmsg("Dictionary not found for dictid %u", dictid))); + + dict_bytea = DatumGetByteaP(datum); + if (dict_bytea == NULL) + ereport(ERROR, (errmsg("Failed to fetch dictionary"))); + + /* Determine the total size of the bytea (header + data) */ + bytea_len = VARSIZE(dict_bytea); + + result = palloc(bytea_len); + memcpy(result, dict_bytea, bytea_len); + + /* Release the syscache tuple; the returned bytea is now independent */ + ReleaseSysCache(tuple); + + return result; +} + +/* + * zstd_dictionary_builder + * Acquire samples from a column, store them in a SampleCollector, + * filter them, then build a ZstdTrainingData struct. + */ +Datum +zstd_dictionary_builder(PG_FUNCTION_ARGS) +{ + ZstdTrainingData *dict = palloc0(sizeof(ZstdTrainingData)); + Relation rel = (Relation) PG_GETARG_POINTER(0); + Form_pg_attribute att = (Form_pg_attribute) PG_GETARG_POINTER(1); + TupleDesc tupleDesc = RelationGetDescr(rel); + + /* Acquire up to TARG_ROWS sample rows. */ + HeapTuple *sample_rows = palloc(TARG_ROWS * sizeof(HeapTuple)); + double totalrows = 0, + totaldeadrows = 0; + int num_sampled = acquire_sample_rows(rel, 0, sample_rows, + TARG_ROWS, + &totalrows, + &totaldeadrows); + + /* Create a collector to accumulate raw varlena samples. */ + size_t filtered_sample_count = 0; + size_t filtered_samples_size = 0; + char *samples_buffer; + size_t *sample_sizes; + size_t current_offset; + SampleCollector *collector; + + if (num_sampled == 0) + { + pfree(sample_rows); + /* No samples were collected. */ + PG_RETURN_POINTER(dict); + } + + collector = palloc(sizeof(SampleCollector)); + collector->samples = palloc(num_sampled * sizeof(SampleEntry)); + collector->sample_count = 0; + + /* Extract column data from each sampled row. */ + for (int i = 0; i < num_sampled; i++) + { + bool isnull; + Datum value; + + CHECK_FOR_INTERRUPTS(); + + value = heap_getattr(sample_rows[i], + att->attnum, + tupleDesc, + &isnull); + if (!isnull) + { + struct varlena *attr; + size_t size; + void *data; + SampleEntry entry; + int idx; + + attr = (struct varlena *) PG_DETOAST_DATUM(value); + size = VARSIZE_ANY_EXHDR(attr); + + if (filtered_samples_size + size > MaxAllocSize) + break; + + data = palloc(size); + memcpy(data, VARDATA_ANY(attr), size); + + entry.data = data; + entry.size = size; + + idx = collector->sample_count; + collector->samples[idx] = entry; + collector->sample_count++; + + filtered_samples_size += size; + filtered_sample_count++; + } + } + + if (filtered_sample_count == 0) + { + pfree(sample_rows); + pfree(collector->samples); + pfree(collector); + /* No samples were collected, or they were too large. */ + PG_RETURN_POINTER(dict); + } + + /* Allocate a buffer for all sample data, plus an array of sample sizes. */ + samples_buffer = palloc(filtered_samples_size); + sample_sizes = palloc(filtered_sample_count * sizeof(size_t)); + + /* + * Concatenate the samples into samples_buffer, recording each sample's + * size in sample_sizes. + */ + current_offset = 0; + for (int i = 0; i < filtered_sample_count; i++) + { + memcpy(samples_buffer + current_offset, + collector->samples[i].data, + collector->samples[i].size); + + sample_sizes[i] = collector->samples[i].size; + current_offset += collector->samples[i].size; + + pfree(collector->samples[i].data); + } + pfree(sample_rows); + pfree(collector->samples); + pfree(collector); + + dict->sample_buffer = samples_buffer; + dict->sample_sizes = sample_sizes; + dict->nitems = filtered_sample_count; + + PG_RETURN_POINTER(dict); +} + +Datum +build_zstd_dict_for_attribute(PG_FUNCTION_ARGS) +{ +#ifndef USE_ZSTD + PG_RETURN_BOOL(false); +#else + text *tablename = PG_GETARG_TEXT_PP(0); + RangeVar *tablerel; + Oid tableoid = InvalidOid; + AttrNumber attno = PG_GETARG_INT32(1); + bool success; + + /* Look up table name. */ + tablerel = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); + tableoid = RangeVarGetRelid(tablerel, NoLock, false); + success = build_zstd_dictionary_internal(tableoid, attno); + PG_RETURN_BOOL(success); +#endif +} diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 2b5fbdcbd8..2b5500f45f 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -55,7 +55,7 @@ #include "utils/sortsupport.h" #include "utils/syscache.h" #include "utils/timestamp.h" - +#include "parser/analyze.h" /* Per-index data for ANALYZE */ typedef struct AnlIndexData @@ -85,9 +85,6 @@ static void compute_index_stats(Relation onerel, double totalrows, MemoryContext col_context); static VacAttrStats *examine_attribute(Relation onerel, int attnum, Node *index_expr); -static int acquire_sample_rows(Relation onerel, int elevel, - HeapTuple *rows, int targrows, - double *totalrows, double *totaldeadrows); static int compare_rows(const void *a, const void *b, void *arg); static int acquire_inherited_sample_rows(Relation onerel, int elevel, HeapTuple *rows, int targrows, @@ -1195,7 +1192,7 @@ block_sampling_read_stream_next(ReadStream *stream, * block. The previous sampling method put too much credence in the row * density near the start of the table. */ -static int +int acquire_sample_rows(Relation onerel, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows) diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 3cb3ca1cca..c583e48167 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -95,6 +95,7 @@ typedef struct bool updateTypmodout; bool updateAnalyze; bool updateSubscript; + bool updateGenerateDictionary; /* New values for relevant attributes */ char storage; Oid receiveOid; @@ -103,6 +104,7 @@ typedef struct Oid typmodoutOid; Oid analyzeOid; Oid subscriptOid; + Oid buildZstdDictionary; } AlterTypeRecurseParams; /* Potentially set by pg_upgrade_support functions */ @@ -122,6 +124,7 @@ static Oid findTypeSendFunction(List *procname, Oid typeOid); static Oid findTypeTypmodinFunction(List *procname); static Oid findTypeTypmodoutFunction(List *procname); static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid); +static Oid findTypeGenerateDictionaryFunction(List *procname, Oid typeOid); static Oid findTypeSubscriptingFunction(List *procname, Oid typeOid); static Oid findRangeSubOpclass(List *opcname, Oid subtype); static Oid findRangeCanonicalFunction(List *procname, Oid typeOid); @@ -162,6 +165,7 @@ DefineType(ParseState *pstate, List *names, List *parameters) List *typmodoutName = NIL; List *analyzeName = NIL; List *subscriptName = NIL; + List *generateDictionaryName = NIL; char category = TYPCATEGORY_USER; bool preferred = false; char delimiter = DEFAULT_TYPDELIM; @@ -190,6 +194,7 @@ DefineType(ParseState *pstate, List *names, List *parameters) DefElem *alignmentEl = NULL; DefElem *storageEl = NULL; DefElem *collatableEl = NULL; + DefElem *generateDictionaryEl = NULL; Oid inputOid; Oid outputOid; Oid receiveOid = InvalidOid; @@ -198,6 +203,7 @@ DefineType(ParseState *pstate, List *names, List *parameters) Oid typmodoutOid = InvalidOid; Oid analyzeOid = InvalidOid; Oid subscriptOid = InvalidOid; + Oid buildZstdDictionary = InvalidOid; char *array_type; Oid array_oid; Oid typoid; @@ -323,6 +329,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) defelp = &storageEl; else if (strcmp(defel->defname, "collatable") == 0) defelp = &collatableEl; + else if (strcmp(defel->defname, "build_zstd_dict") == 0) + defelp = &generateDictionaryEl; else { /* WARNING, not ERROR, for historical backwards-compatibility */ @@ -455,6 +463,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) } if (collatableEl) collation = defGetBoolean(collatableEl) ? DEFAULT_COLLATION_OID : InvalidOid; + if (generateDictionaryEl) + generateDictionaryName = defGetQualifiedName(generateDictionaryEl); /* * make sure we have our required definitions @@ -516,6 +526,15 @@ DefineType(ParseState *pstate, List *names, List *parameters) errmsg("element type cannot be specified without a subscripting function"))); } + if (generateDictionaryName) + { + if (internalLength != -1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type build_zstd_dict function must be specified only if data type is variable length."))); + buildZstdDictionary = findTypeGenerateDictionaryFunction(generateDictionaryName, typoid); + } + /* * Check permissions on functions. We choose to require the creator/owner * of a type to also own the underlying functions. Since creating a type @@ -550,6 +569,9 @@ DefineType(ParseState *pstate, List *names, List *parameters) if (analyzeOid && !object_ownercheck(ProcedureRelationId, analyzeOid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_FUNCTION, NameListToString(analyzeName)); + if (buildZstdDictionary && !object_ownercheck(ProcedureRelationId, buildZstdDictionary, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_FUNCTION, + NameListToString(generateDictionaryName)); if (subscriptOid && !object_ownercheck(ProcedureRelationId, subscriptOid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_FUNCTION, NameListToString(subscriptName)); @@ -601,7 +623,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) -1, /* typMod (Domains only) */ 0, /* Array Dimensions of typbasetype */ false, /* Type NOT NULL */ - collation); /* type's collation */ + collation, /* type's collation */ + buildZstdDictionary); /* build_zstd_dict procedure */ Assert(typoid == address.objectId); /* @@ -643,7 +666,8 @@ DefineType(ParseState *pstate, List *names, List *parameters) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - collation); /* type's collation */ + collation, /* type's collation */ + InvalidOid); /* build_zstd_dict procedure */ pfree(array_type); @@ -706,6 +730,7 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) Oid receiveProcedure; Oid sendProcedure; Oid analyzeProcedure; + Oid buildZstdDictionary; bool byValue; char category; char delimiter; @@ -842,6 +867,9 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) /* Analysis function */ analyzeProcedure = baseType->typanalyze; + /* Generate dictionary function */ + buildZstdDictionary = baseType->typebuildzstddictionary; + /* * Domains don't need a subscript function, since they are not * subscriptable on their own. If the base type is subscriptable, the @@ -1078,7 +1106,8 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) basetypeMod, /* typeMod value */ typNDims, /* Array dimensions for base type */ typNotNull, /* Type NOT NULL */ - domaincoll); /* type's collation */ + domaincoll, /* type's collation */ + buildZstdDictionary); /* build_zstd_dict procedure */ /* * Create the array type that goes with it. @@ -1119,7 +1148,8 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - domaincoll); /* type's collation */ + domaincoll, /* type's collation */ + InvalidOid); /* build_zstd_dict procedure */ pfree(domainArrayName); @@ -1241,7 +1271,8 @@ DefineEnum(CreateEnumStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation */ + InvalidOid, /* type's collation */ + InvalidOid); /* generate dictionary procedure - default */ /* Enter the enum's values into pg_enum */ EnumValuesCreate(enumTypeAddr.objectId, stmt->vals); @@ -1282,7 +1313,8 @@ DefineEnum(CreateEnumStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation */ + InvalidOid, /* type's collation */ + InvalidOid); /* generate dictionary procedure - default */ pfree(enumArrayName); @@ -1583,7 +1615,8 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation (ranges never have one) */ + InvalidOid, /* type's collation (ranges never have one) */ + InvalidOid); /* generate dictionary procedure - default */ Assert(typoid == InvalidOid || typoid == address.objectId); typoid = address.objectId; @@ -1650,7 +1683,8 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* type's collation (ranges never have one) */ + InvalidOid, /* type's collation (ranges never have one) */ + InvalidOid); /* generate dictionary procedure - default */ Assert(multirangeOid == mltrngaddress.objectId); /* Create the entry in pg_range */ @@ -1693,7 +1727,8 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* typcollation */ + InvalidOid, /* typcollation */ + InvalidOid); /* generate dictionary procedure - default */ pfree(rangeArrayName); @@ -1732,7 +1767,8 @@ DefineRange(ParseState *pstate, CreateRangeStmt *stmt) -1, /* typMod (Domains only) */ 0, /* Array dimensions of typbasetype */ false, /* Type NOT NULL */ - InvalidOid); /* typcollation */ + InvalidOid, /* typcollation */ + InvalidOid); /* generate dictionary procedure - default */ /* And create the constructor functions for this range type */ makeRangeConstructors(typeName, typeNamespace, typoid, rangeSubtype); @@ -2257,6 +2293,31 @@ findTypeAnalyzeFunction(List *procname, Oid typeOid) return procOid; } +static Oid +findTypeGenerateDictionaryFunction(List *procname, Oid typeOid) +{ + Oid argList[2]; + Oid procOid; + + argList[0] = OIDOID; + argList[1] = INT4OID; + + procOid = LookupFuncName(procname, 2, argList, true); + if (!OidIsValid(procOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("function %s does not exist", + func_signature_string(procname, 1, NIL, argList)))); + + if (get_func_rettype(procOid) != INTERNALOID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type build zstd dictionary function %s must return type %s", + NameListToString(procname), "internal"))); + + return procOid; +} + static Oid findTypeSubscriptingFunction(List *procname, Oid typeOid) { @@ -4440,6 +4501,19 @@ AlterType(AlterTypeStmt *stmt) /* Replacing a subscript function requires superuser. */ requireSuper = true; } + else if (strcmp(defel->defname, "build_zstd_dict") == 0) + { + if (defel->arg != NULL) + atparams.buildZstdDictionary = + findTypeGenerateDictionaryFunction(defGetQualifiedName(defel), + typeOid); + else + atparams.buildZstdDictionary = InvalidOid; /* NONE, remove function */ + + atparams.updateGenerateDictionary = true; + /* Replacing a canonical function requires superuser. */ + requireSuper = true; + } /* * The rest of the options that CREATE accepts cannot be changed. @@ -4602,6 +4676,11 @@ AlterTypeRecurse(Oid typeOid, bool isImplicitArray, replaces[Anum_pg_type_typsubscript - 1] = true; values[Anum_pg_type_typsubscript - 1] = ObjectIdGetDatum(atparams->subscriptOid); } + if (atparams->updateGenerateDictionary) + { + replaces[Anum_pg_type_typebuildzstddictionary - 1] = true; + values[Anum_pg_type_typebuildzstddictionary - 1] = ObjectIdGetDatum(atparams->buildZstdDictionary); + } newtup = heap_modify_tuple(tup, RelationGetDescr(catalog), values, nulls, replaces); diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index cdf185ea00..36f32f8590 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -5280,6 +5280,9 @@ pg_column_compression(PG_FUNCTION_ARGS) case TOAST_LZ4_COMPRESSION_ID: result = "lz4"; break; + case TOAST_ZSTD_COMPRESSION_ID: + result = "zstd"; + break; default: elog(ERROR, "invalid compression method id %d", cmid); } diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index ad25cbb39c..e03ac8dddc 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -453,6 +453,9 @@ static const struct config_enum_entry default_toast_compression_options[] = { {"pglz", TOAST_PGLZ_COMPRESSION, false}, #ifdef USE_LZ4 {"lz4", TOAST_LZ4_COMPRESSION, false}, +#endif +#ifdef USE_ZSTD + {"zstd", TOAST_ZSTD_COMPRESSION, false}, #endif {NULL, 0, false} }; diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 2d1de9c37b..47773e2919 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -731,7 +731,7 @@ autovacuum_worker_slots = 16 # autovacuum worker slots to allocate #row_security = on #default_table_access_method = 'heap' #default_tablespace = '' # a tablespace name, '' uses the default -#default_toast_compression = 'pglz' # 'pglz' or 'lz4' +#default_toast_compression = 'pglz' # 'pglz' or 'lz4' or 'zstd' #temp_tablespaces = '' # a list of tablespace names, '' uses # only default tablespace #check_function_bodies = on diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 4f4ad2ee15..c2638fe4d8 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -8965,7 +8965,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) "a.attalign,\n" "a.attislocal,\n" "pg_catalog.format_type(t.oid, a.atttypmod) AS atttypname,\n" - "array_to_string(a.attoptions, ', ') AS attoptions,\n" + "array_to_string(ARRAY(SELECT x FROM unnest(a.attoptions) AS x \n" + "WHERE x NOT LIKE 'dictid=%'), ', ') AS attoptions, \n" "CASE WHEN a.attcollation <> t.typcollation " "THEN a.attcollation ELSE 0 END AS attcollation,\n" "pg_catalog.array_to_string(ARRAY(" @@ -11784,12 +11785,14 @@ dumpBaseType(Archive *fout, const TypeInfo *tyinfo) char *typmodout; char *typanalyze; char *typsubscript; + char *typebuildzstddictionary; Oid typreceiveoid; Oid typsendoid; Oid typmodinoid; Oid typmodoutoid; Oid typanalyzeoid; Oid typsubscriptoid; + Oid typebuildzstddictionaryoid; char *typcategory; char *typispreferred; char *typdelim; @@ -11822,10 +11825,18 @@ dumpBaseType(Archive *fout, const TypeInfo *tyinfo) if (fout->remoteVersion >= 140000) appendPQExpBufferStr(query, "typsubscript, " - "typsubscript::pg_catalog.oid AS typsubscriptoid "); + "typsubscript::pg_catalog.oid AS typsubscriptoid, "); else appendPQExpBufferStr(query, - "'-' AS typsubscript, 0 AS typsubscriptoid "); + "'-' AS typsubscript, 0 AS typsubscriptoid, "); + + if (fout->remoteVersion >= 180000) + appendPQExpBufferStr(query, + "typebuildzstddictionary, " + "typebuildzstddictionary::pg_catalog.oid AS typebuildzstddictionaryoid "); + else + appendPQExpBufferStr(query, + "'-' AS typebuildzstddictionary, 0 AS typebuildzstddictionaryoid "); appendPQExpBufferStr(query, "FROM pg_catalog.pg_type " "WHERE oid = $1"); @@ -11850,12 +11861,14 @@ dumpBaseType(Archive *fout, const TypeInfo *tyinfo) typmodout = PQgetvalue(res, 0, PQfnumber(res, "typmodout")); typanalyze = PQgetvalue(res, 0, PQfnumber(res, "typanalyze")); typsubscript = PQgetvalue(res, 0, PQfnumber(res, "typsubscript")); + typebuildzstddictionary = PQgetvalue(res, 0, PQfnumber(res, "typebuildzstddictionary")); typreceiveoid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typreceiveoid"))); typsendoid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typsendoid"))); typmodinoid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typmodinoid"))); typmodoutoid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typmodoutoid"))); typanalyzeoid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typanalyzeoid"))); typsubscriptoid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typsubscriptoid"))); + typebuildzstddictionaryoid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typebuildzstddictionaryoid"))); typcategory = PQgetvalue(res, 0, PQfnumber(res, "typcategory")); typispreferred = PQgetvalue(res, 0, PQfnumber(res, "typispreferred")); typdelim = PQgetvalue(res, 0, PQfnumber(res, "typdelim")); @@ -11911,7 +11924,8 @@ dumpBaseType(Archive *fout, const TypeInfo *tyinfo) appendPQExpBuffer(q, ",\n TYPMOD_OUT = %s", typmodout); if (OidIsValid(typanalyzeoid)) appendPQExpBuffer(q, ",\n ANALYZE = %s", typanalyze); - + if (OidIsValid(typebuildzstddictionaryoid)) + appendPQExpBuffer(q, ",\n BUILD_ZSTD_DICT = %s", typebuildzstddictionary); if (strcmp(typcollatable, "t") == 0) appendPQExpBufferStr(q, ",\n COLLATABLE = true"); @@ -17170,6 +17184,9 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo) case 'l': cmname = "lz4"; break; + case 'z': + cmname = "zstd"; + break; default: cmname = NULL; break; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index e6cf468ac9..0ba37bb175 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -2167,8 +2167,9 @@ describeOneTableDetails(const char *schemaname, /* these strings are literal in our syntax, so not translated. */ printTableAddCell(&cont, (compression[0] == 'p' ? "pglz" : (compression[0] == 'l' ? "lz4" : - (compression[0] == '\0' ? "" : - "???"))), + (compression[0] == 'z' ? "zstd" : + (compression[0] == '\0' ? "" : + "???")))), false, false); } diff --git a/src/include/access/toast_compression.h b/src/include/access/toast_compression.h index 13c4612cee..ddea1e0bcd 100644 --- a/src/include/access/toast_compression.h +++ b/src/include/access/toast_compression.h @@ -38,7 +38,8 @@ typedef enum ToastCompressionId { TOAST_PGLZ_COMPRESSION_ID = 0, TOAST_LZ4_COMPRESSION_ID = 1, - TOAST_INVALID_COMPRESSION_ID = 2, + TOAST_ZSTD_COMPRESSION_ID = 2, + TOAST_INVALID_COMPRESSION_ID = 3 } ToastCompressionId; /* @@ -48,10 +49,15 @@ typedef enum ToastCompressionId */ #define TOAST_PGLZ_COMPRESSION 'p' #define TOAST_LZ4_COMPRESSION 'l' +#define TOAST_ZSTD_COMPRESSION 'z' #define InvalidCompressionMethod '\0' #define CompressionMethodIsValid(cm) ((cm) != InvalidCompressionMethod) +#define InvalidDictId 0 +#define DEFAULT_ZSTD_LEVEL 3 /* Reffered from + * ZSTD_CLEVEL_DEFAULT */ +#define DEFAULT_ZSTD_DICT_SIZE (4 * 1024) /* 4 KB */ /* pglz compression/decompression routines */ extern struct varlena *pglz_compress_datum(const struct varlena *value); @@ -65,6 +71,11 @@ extern struct varlena *lz4_decompress_datum(const struct varlena *value); extern struct varlena *lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength); +/* zstd compression/decompression routines */ +extern struct varlena *zstd_compress_datum(const struct varlena *value, Oid dictid, int zstd_level); +extern struct varlena *zstd_decompress_datum(const struct varlena *value); +extern struct varlena *zstd_decompress_datum_slice(const struct varlena *value, int32 slicelength); + /* other stuff */ extern ToastCompressionId toast_get_compression_id(struct varlena *attr); extern char CompressionNameToMethod(const char *compression); diff --git a/src/include/access/toast_helper.h b/src/include/access/toast_helper.h index e6ab8afffb..08bf3dfc67 100644 --- a/src/include/access/toast_helper.h +++ b/src/include/access/toast_helper.h @@ -33,6 +33,8 @@ typedef struct int32 tai_size; uint8 tai_colflags; char tai_compression; + Oid dictid; + int zstd_level; } ToastAttrInfo; /* diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h index 06ae8583c1..9ba6a1e64a 100644 --- a/src/include/access/toast_internals.h +++ b/src/include/access/toast_internals.h @@ -27,25 +27,54 @@ typedef struct toast_compress_header * external size; see va_extinfo */ } toast_compress_header; +typedef struct toast_compress_header_ext +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + uint32 tcinfo; /* 2 bits for compression method and 30 bits + * external size; see va_extinfo */ + uint32 ext_alg; /* compression method */ + uint32 dictid; /* Dictionary Id */ +} toast_compress_header_ext; + +typedef struct CompressionInfo +{ + char cmethod; + Oid dictid; + int zstd_level; /* ZSTD compression level */ +} CompressionInfo; + /* * Utilities for manipulation of header information for compressed * toast entries. */ #define TOAST_COMPRESS_EXTSIZE(ptr) \ (((toast_compress_header *) (ptr))->tcinfo & VARLENA_EXTSIZE_MASK) -#define TOAST_COMPRESS_METHOD(ptr) \ - (((toast_compress_header *) (ptr))->tcinfo >> VARLENA_EXTSIZE_BITS) - -#define TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(ptr, len, cm_method) \ - do { \ - Assert((len) > 0 && (len) <= VARLENA_EXTSIZE_MASK); \ - Assert((cm_method) == TOAST_PGLZ_COMPRESSION_ID || \ - (cm_method) == TOAST_LZ4_COMPRESSION_ID); \ - ((toast_compress_header *) (ptr))->tcinfo = \ - (len) | ((uint32) (cm_method) << VARLENA_EXTSIZE_BITS); \ +#define TOAST_COMPRESS_METHOD(PTR) \ + ( ((((toast_compress_header *) (PTR))->tcinfo >> VARLENA_EXTSIZE_BITS) == VARLENA_EXTENDED_COMPRESSION_FLAG ) \ + ? (((toast_compress_header_ext *) (PTR))->ext_alg) \ + : ( (((toast_compress_header *) (PTR))->tcinfo) >> VARLENA_EXTSIZE_BITS ) ) + +#define TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(ptr, len, cm_method, dictid) \ + do { \ + Assert((len) > 0 && (len) <= VARLENA_EXTSIZE_MASK); \ + Assert((cm_method) == TOAST_PGLZ_COMPRESSION_ID || \ + (cm_method) == TOAST_LZ4_COMPRESSION_ID || \ + (cm_method) == TOAST_ZSTD_COMPRESSION_ID); \ + /* If the compression method is less than TOAST_ZSTD_COMPRESSION_ID, don't use ext_alg */ \ + if ((cm_method) < TOAST_ZSTD_COMPRESSION_ID) { \ + ((toast_compress_header *) (ptr))->tcinfo = \ + (len) | ((uint32) (cm_method) << VARLENA_EXTSIZE_BITS); \ + } else { \ + /* For compression methods after lz4, use 'VARLENA_EXTENDED_COMPRESSION_FLAG' \ + in the top bits of tcinfo to indicate compression algorithm is stored in ext_alg. */ \ + ((toast_compress_header_ext *) (ptr))->tcinfo = \ + (len) | ((uint32)VARLENA_EXTENDED_COMPRESSION_FLAG << VARLENA_EXTSIZE_BITS); \ + ((toast_compress_header_ext *) (ptr))->ext_alg = (cm_method); \ + ((toast_compress_header_ext *) (ptr))->dictid = (dictid); \ + } \ } while (0) -extern Datum toast_compress_datum(Datum value, char cmethod); +extern Datum toast_compress_datum(Datum value, CompressionInfo cmp); extern Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock); extern void toast_delete_datum(Relation rel, Datum value, bool is_speculative); diff --git a/src/include/catalog/Makefile b/src/include/catalog/Makefile index 2bbc7805fe..1ecd76dd31 100644 --- a/src/include/catalog/Makefile +++ b/src/include/catalog/Makefile @@ -81,7 +81,8 @@ CATALOG_HEADERS := \ pg_publication_namespace.h \ pg_publication_rel.h \ pg_subscription.h \ - pg_subscription_rel.h + pg_subscription_rel.h \ + pg_zstd_dictionaries.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index f427a89618..7cea56c2d5 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202503071 +#define CATALOG_VERSION_NO 202503081 #endif diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build index ec1cf467f6..e9cb6d911c 100644 --- a/src/include/catalog/meson.build +++ b/src/include/catalog/meson.build @@ -69,6 +69,7 @@ catalog_headers = [ 'pg_publication_rel.h', 'pg_subscription.h', 'pg_subscription_rel.h', + 'pg_zstd_dictionaries.h', ] # The .dat files we need can just be listed alphabetically. diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index cede992b6e..2bcae1eb52 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12469,4 +12469,14 @@ proargtypes => 'int4', prosrc => 'gist_stratnum_common' }, +# ZSTD generate dictionary training functions +{ oid => '9241', descr => 'ZSTD generate dictionary support', + proname => 'zstd_dictionary_builder', prorettype => 'internal', + proargtypes => 'internal internal', + prosrc => 'zstd_dictionary_builder' }, + +{ oid => '9242', descr => 'Build zstd dictionaries for a column.', + proname => 'build_zstd_dict_for_attribute', prorettype => 'bool', + proargtypes => 'text int4', + prosrc => 'build_zstd_dict_for_attribute' }, ] diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index 6dca77e0a2..58a389a78c 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -40,7 +40,7 @@ descr => 'variable-length string, binary values escaped', typname => 'bytea', typlen => '-1', typbyval => 'f', typcategory => 'U', typinput => 'byteain', typoutput => 'byteaout', typreceive => 'bytearecv', - typsend => 'byteasend', typalign => 'i', typstorage => 'x' }, + typsend => 'byteasend', typalign => 'i', typstorage => 'x', typebuildzstddictionary => 'zstd_dictionary_builder' }, { oid => '18', array_type_oid => '1002', descr => 'single character', typname => 'char', typlen => '1', typbyval => 't', typcategory => 'Z', typinput => 'charin', typoutput => 'charout', typreceive => 'charrecv', @@ -83,7 +83,7 @@ typname => 'text', typlen => '-1', typbyval => 'f', typcategory => 'S', typispreferred => 't', typinput => 'textin', typoutput => 'textout', typreceive => 'textrecv', typsend => 'textsend', typalign => 'i', - typstorage => 'x', typcollation => 'default' }, + typstorage => 'x', typebuildzstddictionary => 'zstd_dictionary_builder', typcollation => 'default' }, { oid => '26', array_type_oid => '1028', descr => 'object identifier(oid), maximum 4 billion', typname => 'oid', typlen => '4', typbyval => 't', typcategory => 'N', @@ -446,7 +446,7 @@ typname => 'jsonb', typlen => '-1', typbyval => 'f', typcategory => 'U', typsubscript => 'jsonb_subscript_handler', typinput => 'jsonb_in', typoutput => 'jsonb_out', typreceive => 'jsonb_recv', typsend => 'jsonb_send', - typalign => 'i', typstorage => 'x' }, + typalign => 'i', typstorage => 'x', typebuildzstddictionary => 'zstd_dictionary_builder' }, { oid => '4072', array_type_oid => '4073', descr => 'JSON path', typname => 'jsonpath', typlen => '-1', typbyval => 'f', typcategory => 'U', typinput => 'jsonpath_in', typoutput => 'jsonpath_out', diff --git a/src/include/catalog/pg_type.h b/src/include/catalog/pg_type.h index ff666711a5..bd82da8a88 100644 --- a/src/include/catalog/pg_type.h +++ b/src/include/catalog/pg_type.h @@ -227,6 +227,11 @@ CATALOG(pg_type,1247,TypeRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(71,TypeRelati */ Oid typcollation BKI_DEFAULT(0) BKI_LOOKUP_OPT(pg_collation); + /* + * Custom generate dictionary procedure for the datatype (0 selects the + * default). + */ + regproc typebuildzstddictionary BKI_DEFAULT(-) BKI_ARRAY_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc); #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* @@ -380,7 +385,8 @@ extern ObjectAddress TypeCreate(Oid newTypeOid, int32 typeMod, int32 typNDims, bool typeNotNull, - Oid typeCollation); + Oid typeCollation, + Oid generateDictionaryProcedure); extern void GenerateTypeDependencies(HeapTuple typeTuple, Relation typeCatalog, diff --git a/src/include/catalog/pg_zstd_dictionaries.h b/src/include/catalog/pg_zstd_dictionaries.h new file mode 100644 index 0000000000..5b0b729283 --- /dev/null +++ b/src/include/catalog/pg_zstd_dictionaries.h @@ -0,0 +1,53 @@ +/*------------------------------------------------------------------------- + * + * pg_zstd_dictionaries.h + * definition of the "zstd dictionay" system catalog (pg_zstd_dictionaries) + * + * src/include/catalog/pg_zstd_dictionaries.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_ZSTD_DICTIONARIES_H +#define PG_ZSTD_DICTIONARIES_H + +#include "catalog/genbki.h" +#include "catalog/pg_zstd_dictionaries_d.h" + +/* ---------------- + * pg_zstd_dictionaries definition. cpp turns this into + * typedef struct FormData_pg_zstd_dictionaries + * ---------------- + */ +CATALOG(pg_zstd_dictionaries,9946,ZstdDictionariesRelationId) +{ + Oid dictid BKI_FORCE_NOT_NULL; + + /* + * variable-length fields start here, but we allow direct access to dict + */ + bytea dict BKI_FORCE_NOT_NULL; +} FormData_pg_zstd_dictionaries; + +/* Pointer type to a tuple with the format of pg_zstd_dictionaries relation */ +typedef FormData_pg_zstd_dictionaries *Form_pg_zstd_dictionaries; + +DECLARE_TOAST(pg_zstd_dictionaries, 9947, 9948); + +DECLARE_UNIQUE_INDEX_PKEY(pg_zstd_dictionaries_dictid_index, 9949, ZstdDictidIndexId, pg_zstd_dictionaries, btree(dictid oid_ops)); + +MAKE_SYSCACHE(ZSTDDICTIDOID, pg_zstd_dictionaries_dictid_index, 128); + +typedef struct ZstdTrainingData +{ + char *sample_buffer; /* Pointer to the raw sample buffer */ + size_t *sample_sizes; /* Array of sample sizes */ + size_t nitems; /* Number of sample sizes */ +} ZstdTrainingData; + +extern bytea *get_zstd_dict(Oid dictid); + +#endif /* PG_ZSTD_DICTIONARIES_H */ diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h index f1bd18c49f..e494436870 100644 --- a/src/include/parser/analyze.h +++ b/src/include/parser/analyze.h @@ -17,6 +17,7 @@ #include "nodes/params.h" #include "nodes/queryjumble.h" #include "parser/parse_node.h" +#include "access/htup.h" /* Hook for plugins to get control at end of parse analysis */ typedef void (*post_parse_analyze_hook_type) (ParseState *pstate, @@ -64,4 +65,8 @@ extern List *BuildOnConflictExcludedTargetlist(Relation targetrel, extern SortGroupClause *makeSortGroupClauseForSetOp(Oid rescoltype, bool require_hash); +extern int acquire_sample_rows(Relation onerel, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, double *totaldeadrows); + #endif /* ANALYZE_H */ diff --git a/src/include/utils/attoptcache.h b/src/include/utils/attoptcache.h index f684a772af..55a6ac6167 100644 --- a/src/include/utils/attoptcache.h +++ b/src/include/utils/attoptcache.h @@ -21,6 +21,12 @@ typedef struct AttributeOpts int32 vl_len_; /* varlena header (do not touch directly!) */ float8 n_distinct; float8 n_distinct_inherited; + double dictid; /* Oid is a 32-bit unsigned integer, but + * relopt_int is limited to INT_MAX, so it + * cannot represent the full range of Oid + * values. */ + int zstd_dict_size; + int zstd_level; } AttributeOpts; extern AttributeOpts *get_attribute_options(Oid attrelid, int attnum); diff --git a/src/include/varatt.h b/src/include/varatt.h index 2e8564d499..5fb0ab8beb 100644 --- a/src/include/varatt.h +++ b/src/include/varatt.h @@ -42,8 +42,9 @@ typedef struct varatt_external * These macros define the "saved size" portion of va_extinfo. Its remaining * two high-order bits identify the compression method. */ -#define VARLENA_EXTSIZE_BITS 30 -#define VARLENA_EXTSIZE_MASK ((1U << VARLENA_EXTSIZE_BITS) - 1) +#define VARLENA_EXTSIZE_BITS 30 +#define VARLENA_EXTSIZE_MASK ((1U << VARLENA_EXTSIZE_BITS) - 1) +#define VARLENA_EXTENDED_COMPRESSION_FLAG 0x3 /* * struct varatt_indirect is a "TOAST pointer" representing an out-of-line @@ -122,6 +123,14 @@ typedef union * compression method; see va_extinfo */ char va_data[FLEXIBLE_ARRAY_MEMBER]; /* Compressed data */ } va_compressed; + struct + { + uint32 va_header; + uint32 va_tcinfo; + uint32 va_cmp_alg; + uint32 va_cmp_dictid; + char va_data[FLEXIBLE_ARRAY_MEMBER]; + } va_compressed_ext; } varattrib_4b; typedef struct @@ -242,7 +251,14 @@ typedef struct #endif /* WORDS_BIGENDIAN */ #define VARDATA_4B(PTR) (((varattrib_4b *) (PTR))->va_4byte.va_data) -#define VARDATA_4B_C(PTR) (((varattrib_4b *) (PTR))->va_compressed.va_data) +/* + * If va_tcinfo >> VARLENA_EXTSIZE_BITS == VARLENA_EXTENDED_COMPRESSION_FLAG + * use va_compressed_ext; otherwise, use the va_compressed. + */ +#define VARDATA_4B_C(PTR) \ +( (((varattrib_4b *)(PTR))->va_compressed.va_tcinfo >> VARLENA_EXTSIZE_BITS) == VARLENA_EXTENDED_COMPRESSION_FLAG \ + ? ((varattrib_4b *)(PTR))->va_compressed_ext.va_data \ + : ((varattrib_4b *)(PTR))->va_compressed.va_data ) #define VARDATA_1B(PTR) (((varattrib_1b *) (PTR))->va_data) #define VARDATA_1B_E(PTR) (((varattrib_1b_e *) (PTR))->va_data) @@ -252,6 +268,7 @@ typedef struct #define VARHDRSZ_EXTERNAL offsetof(varattrib_1b_e, va_data) #define VARHDRSZ_COMPRESSED offsetof(varattrib_4b, va_compressed.va_data) +#define VARHDRSZ_COMPRESSED_EXT offsetof(varattrib_4b, va_compressed_ext.va_data) #define VARHDRSZ_SHORT offsetof(varattrib_1b, va_data) #define VARATT_SHORT_MAX 0x7F @@ -327,8 +344,20 @@ typedef struct /* Decompressed size and compression method of a compressed-in-line Datum */ #define VARDATA_COMPRESSED_GET_EXTSIZE(PTR) \ (((varattrib_4b *) (PTR))->va_compressed.va_tcinfo & VARLENA_EXTSIZE_MASK) -#define VARDATA_COMPRESSED_GET_COMPRESS_METHOD(PTR) \ - (((varattrib_4b *) (PTR))->va_compressed.va_tcinfo >> VARLENA_EXTSIZE_BITS) +/* + * - "Extended" format is indicated by (va_tcinfo >> VARLENA_EXTSIZE_BITS) == VARLENA_EXTENDED_COMPRESSION_FLAG + * - For the non-extended formats, the method code is stored in the top bits of va_tcinfo. + * - In the extended format, the method code is stored in va_cmp_alg instead. + */ +#define VARDATA_COMPRESSED_GET_COMPRESS_METHOD(PTR) \ +( ((((varattrib_4b *) (PTR))->va_compressed.va_tcinfo >> VARLENA_EXTSIZE_BITS) == VARLENA_EXTENDED_COMPRESSION_FLAG ) \ + ? (((varattrib_4b *) (PTR))->va_compressed_ext.va_cmp_alg) \ + : ( (((varattrib_4b *) (PTR))->va_compressed.va_tcinfo) >> VARLENA_EXTSIZE_BITS)) + +#define VARDATA_COMPRESSED_GET_DICTID(PTR) \ + ( ((((varattrib_4b *) (PTR))->va_compressed.va_tcinfo >> VARLENA_EXTSIZE_BITS) == VARLENA_EXTENDED_COMPRESSION_FLAG ) \ + ? (((varattrib_4b *) (PTR))->va_compressed_ext.va_cmp_dictid) \ + : InvalidDictId) /* Same for external Datums; but note argument is a struct varatt_external */ #define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer) \ @@ -336,13 +365,27 @@ typedef struct #define VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer) \ ((toast_pointer).va_extinfo >> VARLENA_EXTSIZE_BITS) -#define VARATT_EXTERNAL_SET_SIZE_AND_COMPRESS_METHOD(toast_pointer, len, cm) \ - do { \ - Assert((cm) == TOAST_PGLZ_COMPRESSION_ID || \ - (cm) == TOAST_LZ4_COMPRESSION_ID); \ - ((toast_pointer).va_extinfo = \ - (len) | ((uint32) (cm) << VARLENA_EXTSIZE_BITS)); \ - } while (0) +#define VARATT_EXTERNAL_SET_SIZE_AND_COMPRESS_METHOD(toast_pointer, len, cm) \ + do { \ + /* If desired, keep or expand the Assert checks for known methods: */ \ + Assert((cm) == TOAST_PGLZ_COMPRESSION_ID || \ + (cm) == TOAST_LZ4_COMPRESSION_ID || \ + (cm) == TOAST_ZSTD_COMPRESSION_ID); \ + if ((cm) < TOAST_ZSTD_COMPRESSION_ID) \ + { \ + /* Store the actual method in va_extinfo */ \ + (toast_pointer).va_extinfo = (uint32)(len) \ + | ((uint32)(cm) << VARLENA_EXTSIZE_BITS); \ + } \ + else \ + { \ + /* Store VARLENA_EXTENDED_COMPRESSION_FLAG in the top bits, \ + meaning "extended" method. */ \ + (toast_pointer).va_extinfo = (uint32)(len) | \ + ((uint32)VARLENA_EXTENDED_COMPRESSION_FLAG \ + << VARLENA_EXTSIZE_BITS); \ + } \ + } while (0) /* * Testing whether an externally-stored value is compressed now requires diff --git a/src/test/regress/expected/compression.out b/src/test/regress/expected/compression.out index 4dd9ee7200..94495388ad 100644 --- a/src/test/regress/expected/compression.out +++ b/src/test/regress/expected/compression.out @@ -238,10 +238,11 @@ NOTICE: merging multiple inherited definitions of column "f1" -- test default_toast_compression GUC SET default_toast_compression = ''; ERROR: invalid value for parameter "default_toast_compression": "" -HINT: Available values: pglz, lz4. +HINT: Available values: pglz, lz4, zstd. SET default_toast_compression = 'I do not exist compression'; ERROR: invalid value for parameter "default_toast_compression": "I do not exist compression" -HINT: Available values: pglz, lz4. +HINT: Available values: pglz, lz4, zstd. +SET default_toast_compression = 'zstd'; SET default_toast_compression = 'lz4'; SET default_toast_compression = 'pglz'; -- test alter compression method diff --git a/src/test/regress/expected/compression_1.out b/src/test/regress/expected/compression_1.out index 7bd7642b4b..0ce4915217 100644 --- a/src/test/regress/expected/compression_1.out +++ b/src/test/regress/expected/compression_1.out @@ -233,6 +233,9 @@ HINT: Available values: pglz. SET default_toast_compression = 'I do not exist compression'; ERROR: invalid value for parameter "default_toast_compression": "I do not exist compression" HINT: Available values: pglz. +SET default_toast_compression = 'zstd'; +ERROR: invalid value for parameter "default_toast_compression": "zstd" +HINT: Available values: pglz. SET default_toast_compression = 'lz4'; ERROR: invalid value for parameter "default_toast_compression": "lz4" HINT: Available values: pglz. diff --git a/src/test/regress/expected/compression_zstd.out b/src/test/regress/expected/compression_zstd.out new file mode 100644 index 0000000000..7de110a90a --- /dev/null +++ b/src/test/regress/expected/compression_zstd.out @@ -0,0 +1,123 @@ +\set HIDE_TOAST_COMPRESSION false +-- Ensure stable results regardless of the installation's default. +SET default_toast_compression = 'pglz'; +---------------------------------------------------------------- +-- 1. Create Test Table with Zstd Compression +---------------------------------------------------------------- +DROP TABLE IF EXISTS cmdata_zstd CASCADE; +NOTICE: table "cmdata_zstd" does not exist, skipping +CREATE TABLE cmdata_zstd ( + f1 TEXT COMPRESSION zstd +); +ERROR: compression method zstd not supported +DETAIL: This functionality requires the server to be built with zstd support. +---------------------------------------------------------------- +-- 2. Insert Data Rows +---------------------------------------------------------------- +DO $$ +BEGIN + FOR i IN 1..15 LOOP + INSERT INTO cmdata_zstd (f1) VALUES (repeat('1234567890', 1004)); + END LOOP; +END $$; +ERROR: relation "cmdata_zstd" does not exist +LINE 1: INSERT INTO cmdata_zstd (f1) VALUES (repeat('1234567890', 10... + ^ +QUERY: INSERT INTO cmdata_zstd (f1) VALUES (repeat('1234567890', 1004)) +CONTEXT: PL/pgSQL function inline_code_block line 4 at SQL statement +-- Create a helper function to generate extra-large values. +CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS +$$ + SELECT string_agg(md5(g::text), '') + FROM generate_series(1,256) g +$$; +-- Insert 5 extra-large rows to force externally stored compression. +DO $$ +BEGIN + FOR i IN 1..5 LOOP + INSERT INTO cmdata_zstd (f1) + VALUES (large_val() || repeat('a', 4000)); + END LOOP; +END $$; +ERROR: relation "cmdata_zstd" does not exist +LINE 1: INSERT INTO cmdata_zstd (f1) + ^ +QUERY: INSERT INTO cmdata_zstd (f1) + VALUES (large_val() || repeat('a', 4000)) +CONTEXT: PL/pgSQL function inline_code_block line 4 at SQL statement +---------------------------------------------------------------- +-- 3. Verify Table Structure and Compression Settings +---------------------------------------------------------------- +-- Table Structure for cmdata_zstd +\d+ cmdata_zstd; +-- Compression Settings for f1 Column +SELECT pg_column_compression(f1) AS compression_method, + count(*) AS row_count +FROM cmdata_zstd +GROUP BY pg_column_compression(f1); +ERROR: relation "cmdata_zstd" does not exist +LINE 3: FROM cmdata_zstd + ^ +---------------------------------------------------------------- +-- 4. Decompression Tests +---------------------------------------------------------------- +-- Decompression Slice Test (Extracting Substrings) +SELECT SUBSTR(f1, 200, 50) AS data_slice +FROM cmdata_zstd; +ERROR: relation "cmdata_zstd" does not exist +LINE 2: FROM cmdata_zstd; + ^ +---------------------------------------------------------------- +-- 5. Test Table Creation with LIKE INCLUDING COMPRESSION +---------------------------------------------------------------- +DROP TABLE IF EXISTS cmdata_zstd_2; +NOTICE: table "cmdata_zstd_2" does not exist, skipping +CREATE TABLE cmdata_zstd_2 (LIKE cmdata_zstd INCLUDING COMPRESSION); +ERROR: relation "cmdata_zstd" does not exist +LINE 1: CREATE TABLE cmdata_zstd_2 (LIKE cmdata_zstd INCLUDING COMPR... + ^ +-- Table Structure for cmdata_zstd_2 +\d+ cmdata_zstd_2; +DROP TABLE cmdata_zstd_2; +ERROR: table "cmdata_zstd_2" does not exist +---------------------------------------------------------------- +-- 6. Materialized View Compression Test +---------------------------------------------------------------- +DROP MATERIALIZED VIEW IF EXISTS compressmv_zstd; +NOTICE: materialized view "compressmv_zstd" does not exist, skipping +CREATE MATERIALIZED VIEW compressmv_zstd AS + SELECT f1 FROM cmdata_zstd; +ERROR: relation "cmdata_zstd" does not exist +LINE 2: SELECT f1 FROM cmdata_zstd; + ^ +-- Materialized View Structure for compressmv_zstd +\d+ compressmv_zstd; +-- Materialized View Compression Check +SELECT pg_column_compression(f1) AS mv_compression +FROM compressmv_zstd; +ERROR: relation "compressmv_zstd" does not exist +LINE 2: FROM compressmv_zstd; + ^ +---------------------------------------------------------------- +-- 7. Additional Updates and Round-Trip Tests +---------------------------------------------------------------- +-- Update some rows to check if the dictionary remains effective after modifications. +UPDATE cmdata_zstd +SET f1 = f1 || ' UPDATED'; +ERROR: relation "cmdata_zstd" does not exist +LINE 1: UPDATE cmdata_zstd + ^ +-- Verification of Updated Rows +SELECT SUBSTR(f1, LENGTH(f1) - 7 + 1, 7) AS preview +FROM cmdata_zstd; +ERROR: relation "cmdata_zstd" does not exist +LINE 2: FROM cmdata_zstd; + ^ +---------------------------------------------------------------- +-- 8. Clean Up +---------------------------------------------------------------- +DROP MATERIALIZED VIEW compressmv_zstd; +ERROR: materialized view "compressmv_zstd" does not exist +DROP TABLE cmdata_zstd; +ERROR: table "cmdata_zstd" does not exist +\set HIDE_TOAST_COMPRESSION true diff --git a/src/test/regress/expected/compression_zstd_1.out b/src/test/regress/expected/compression_zstd_1.out new file mode 100644 index 0000000000..a540c99b37 --- /dev/null +++ b/src/test/regress/expected/compression_zstd_1.out @@ -0,0 +1,181 @@ +\set HIDE_TOAST_COMPRESSION false +-- Ensure stable results regardless of the installation's default. +SET default_toast_compression = 'pglz'; +---------------------------------------------------------------- +-- 1. Create Test Table with Zstd Compression +---------------------------------------------------------------- +DROP TABLE IF EXISTS cmdata_zstd CASCADE; +NOTICE: table "cmdata_zstd" does not exist, skipping +CREATE TABLE cmdata_zstd ( + f1 TEXT COMPRESSION zstd +); +---------------------------------------------------------------- +-- 2. Insert Data Rows +---------------------------------------------------------------- +DO $$ +BEGIN + FOR i IN 1..15 LOOP + INSERT INTO cmdata_zstd (f1) VALUES (repeat('1234567890', 1004)); + END LOOP; +END $$; +-- Create a helper function to generate extra-large values. +CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS +$$ + SELECT string_agg(md5(g::text), '') + FROM generate_series(1,256) g +$$; +-- Insert 5 extra-large rows to force externally stored compression. +DO $$ +BEGIN + FOR i IN 1..5 LOOP + INSERT INTO cmdata_zstd (f1) + VALUES (large_val() || repeat('a', 4000)); + END LOOP; +END $$; +---------------------------------------------------------------- +-- 3. Verify Table Structure and Compression Settings +---------------------------------------------------------------- +-- Table Structure for cmdata_zstd +\d+ cmdata_zstd; + Table "public.cmdata_zstd" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +--------+------+-----------+----------+---------+----------+-------------+--------------+------------- + f1 | text | | | | extended | zstd | | + +-- Compression Settings for f1 Column +SELECT pg_column_compression(f1) AS compression_method, + count(*) AS row_count +FROM cmdata_zstd +GROUP BY pg_column_compression(f1); + compression_method | row_count +--------------------+----------- + zstd | 20 +(1 row) + +---------------------------------------------------------------- +-- 4. Decompression Tests +---------------------------------------------------------------- +-- Decompression Slice Test (Extracting Substrings) +SELECT SUBSTR(f1, 200, 50) AS data_slice +FROM cmdata_zstd; + data_slice +---------------------------------------------------- + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + fceea167a5a36dedd4bea2543c9f0f895fb98ab9159f51fd02 + fceea167a5a36dedd4bea2543c9f0f895fb98ab9159f51fd02 + fceea167a5a36dedd4bea2543c9f0f895fb98ab9159f51fd02 + fceea167a5a36dedd4bea2543c9f0f895fb98ab9159f51fd02 + fceea167a5a36dedd4bea2543c9f0f895fb98ab9159f51fd02 +(20 rows) + +---------------------------------------------------------------- +-- 5. Test Table Creation with LIKE INCLUDING COMPRESSION +---------------------------------------------------------------- +DROP TABLE IF EXISTS cmdata_zstd_2; +NOTICE: table "cmdata_zstd_2" does not exist, skipping +CREATE TABLE cmdata_zstd_2 (LIKE cmdata_zstd INCLUDING COMPRESSION); +-- Table Structure for cmdata_zstd_2 +\d+ cmdata_zstd_2; + Table "public.cmdata_zstd_2" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +--------+------+-----------+----------+---------+----------+-------------+--------------+------------- + f1 | text | | | | extended | zstd | | + +DROP TABLE cmdata_zstd_2; +---------------------------------------------------------------- +-- 6. Materialized View Compression Test +---------------------------------------------------------------- +DROP MATERIALIZED VIEW IF EXISTS compressmv_zstd; +NOTICE: materialized view "compressmv_zstd" does not exist, skipping +CREATE MATERIALIZED VIEW compressmv_zstd AS + SELECT f1 FROM cmdata_zstd; +-- Materialized View Structure for compressmv_zstd +\d+ compressmv_zstd; + Materialized view "public.compressmv_zstd" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +--------+------+-----------+----------+---------+----------+-------------+--------------+------------- + f1 | text | | | | extended | | | +View definition: + SELECT f1 + FROM cmdata_zstd; + +-- Materialized View Compression Check +SELECT pg_column_compression(f1) AS mv_compression +FROM compressmv_zstd; + mv_compression +---------------- + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd + zstd +(20 rows) + +---------------------------------------------------------------- +-- 7. Additional Updates and Round-Trip Tests +---------------------------------------------------------------- +-- Update some rows to check if the dictionary remains effective after modifications. +UPDATE cmdata_zstd +SET f1 = f1 || ' UPDATED'; +-- Verification of Updated Rows +SELECT SUBSTR(f1, LENGTH(f1) - 7 + 1, 7) AS preview +FROM cmdata_zstd; + preview +--------- + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED + UPDATED +(20 rows) + +---------------------------------------------------------------- +-- 8. Clean Up +---------------------------------------------------------------- +DROP MATERIALIZED VIEW compressmv_zstd; +DROP TABLE cmdata_zstd; +\set HIDE_TOAST_COMPRESSION true diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be..ac5da3f5ab 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -71,6 +71,7 @@ NOTICE: checking pg_type {typmodout} => pg_proc {oid} NOTICE: checking pg_type {typanalyze} => pg_proc {oid} NOTICE: checking pg_type {typbasetype} => pg_type {oid} NOTICE: checking pg_type {typcollation} => pg_collation {oid} +NOTICE: checking pg_type {typebuildzstddictionary} => pg_proc {oid} NOTICE: checking pg_attribute {attrelid} => pg_class {oid} NOTICE: checking pg_attribute {atttypid} => pg_type {oid} NOTICE: checking pg_attribute {attcollation} => pg_collation {oid} diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 37b6d21e1f..407a0644f8 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -119,7 +119,7 @@ test: plancache limit plpgsql copy2 temp domain rangefuncs prepare conversion tr # The stats test resets stats, so nothing else needing stats access can be in # this group. # ---------- -test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain compression memoize stats predicate +test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain compression compression_zstd memoize stats predicate # event_trigger depends on create_am and cannot run concurrently with # any test that runs DDL diff --git a/src/test/regress/sql/compression.sql b/src/test/regress/sql/compression.sql index 490595fcfb..e29909558f 100644 --- a/src/test/regress/sql/compression.sql +++ b/src/test/regress/sql/compression.sql @@ -102,6 +102,7 @@ CREATE TABLE cminh() INHERITS (cmdata, cmdata3); -- test default_toast_compression GUC SET default_toast_compression = ''; SET default_toast_compression = 'I do not exist compression'; +SET default_toast_compression = 'zstd'; SET default_toast_compression = 'lz4'; SET default_toast_compression = 'pglz'; diff --git a/src/test/regress/sql/compression_zstd.sql b/src/test/regress/sql/compression_zstd.sql new file mode 100644 index 0000000000..7cf93e3de2 --- /dev/null +++ b/src/test/regress/sql/compression_zstd.sql @@ -0,0 +1,97 @@ +\set HIDE_TOAST_COMPRESSION false + +-- Ensure stable results regardless of the installation's default. +SET default_toast_compression = 'pglz'; + +---------------------------------------------------------------- +-- 1. Create Test Table with Zstd Compression +---------------------------------------------------------------- +DROP TABLE IF EXISTS cmdata_zstd CASCADE; +CREATE TABLE cmdata_zstd ( + f1 TEXT COMPRESSION zstd +); + +---------------------------------------------------------------- +-- 2. Insert Data Rows +---------------------------------------------------------------- +DO $$ +BEGIN + FOR i IN 1..15 LOOP + INSERT INTO cmdata_zstd (f1) VALUES (repeat('1234567890', 1004)); + END LOOP; +END $$; + +-- Create a helper function to generate extra-large values. +CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS +$$ + SELECT string_agg(md5(g::text), '') + FROM generate_series(1,256) g +$$; + +-- Insert 5 extra-large rows to force externally stored compression. +DO $$ +BEGIN + FOR i IN 1..5 LOOP + INSERT INTO cmdata_zstd (f1) + VALUES (large_val() || repeat('a', 4000)); + END LOOP; +END $$; + +---------------------------------------------------------------- +-- 3. Verify Table Structure and Compression Settings +---------------------------------------------------------------- +-- Table Structure for cmdata_zstd +\d+ cmdata_zstd; + +-- Compression Settings for f1 Column +SELECT pg_column_compression(f1) AS compression_method, + count(*) AS row_count +FROM cmdata_zstd +GROUP BY pg_column_compression(f1); + +---------------------------------------------------------------- +-- 4. Decompression Tests +---------------------------------------------------------------- +-- Decompression Slice Test (Extracting Substrings) +SELECT SUBSTR(f1, 200, 50) AS data_slice +FROM cmdata_zstd; + +---------------------------------------------------------------- +-- 5. Test Table Creation with LIKE INCLUDING COMPRESSION +---------------------------------------------------------------- +DROP TABLE IF EXISTS cmdata_zstd_2; +CREATE TABLE cmdata_zstd_2 (LIKE cmdata_zstd INCLUDING COMPRESSION); +-- Table Structure for cmdata_zstd_2 +\d+ cmdata_zstd_2; +DROP TABLE cmdata_zstd_2; + +---------------------------------------------------------------- +-- 6. Materialized View Compression Test +---------------------------------------------------------------- +DROP MATERIALIZED VIEW IF EXISTS compressmv_zstd; +CREATE MATERIALIZED VIEW compressmv_zstd AS + SELECT f1 FROM cmdata_zstd; +-- Materialized View Structure for compressmv_zstd +\d+ compressmv_zstd; +-- Materialized View Compression Check +SELECT pg_column_compression(f1) AS mv_compression +FROM compressmv_zstd; + +---------------------------------------------------------------- +-- 7. Additional Updates and Round-Trip Tests +---------------------------------------------------------------- +-- Update some rows to check if the dictionary remains effective after modifications. +UPDATE cmdata_zstd +SET f1 = f1 || ' UPDATED'; + +-- Verification of Updated Rows +SELECT SUBSTR(f1, LENGTH(f1) - 7 + 1, 7) AS preview +FROM cmdata_zstd; + +---------------------------------------------------------------- +-- 8. Clean Up +---------------------------------------------------------------- +DROP MATERIALIZED VIEW compressmv_zstd; +DROP TABLE cmdata_zstd; + +\set HIDE_TOAST_COMPRESSION true diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9840060997..adb94ee7fd 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -886,6 +886,7 @@ FormData_pg_ts_parser FormData_pg_ts_template FormData_pg_type FormData_pg_user_mapping +FormData_pg_zstd_dictionaries FormExtraData_pg_attribute Form_pg_aggregate Form_pg_am @@ -945,6 +946,7 @@ Form_pg_ts_parser Form_pg_ts_template Form_pg_type Form_pg_user_mapping +Form_pg_zstd_dictionaries FormatNode FreeBlockNumberArray FreeListData @@ -2582,6 +2584,8 @@ STARTUPINFO STRLEN SV SYNCHRONIZATION_BARRIER +SampleCollector +SampleEntry SampleScan SampleScanGetSampleSize_function SampleScanState @@ -3306,6 +3310,7 @@ ZSTD_cParameter ZSTD_inBuffer ZSTD_outBuffer ZstdCompressorState +ZstdTrainingData _SPI_connection _SPI_plan __m128i base-commit: 8021c77769e90cc804121d61a1bb7bcc4652d48b -- 2.47.1