From 79753deeef3ebf971df70dbaa974c9ec8a1f74b6 Mon Sep 17 00:00:00 2001 From: Julien Tachoires Date: Fri, 5 Jul 2024 05:25:48 -0700 Subject: [PATCH 5/7] Compress ReorderBuffer spill files using ZSTD --- doc/src/sgml/config.sgml | 4 +- .../logical/reorderbuffer_compression.c | 364 ++++++++++++++++++ src/backend/utils/misc/guc_tables.c | 3 + .../replication/reorderbuffer_compression.h | 39 ++ 4 files changed, 409 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 2697ebc435..7629e10f45 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2019,7 +2019,9 @@ include_dir 'conf.d' parameter has not effect if there is no data to spill on disk. The supported methods are pglz, lz4 (if PostgreSQL was compiled with - ) and off>. + ), zstd (if + PostgreSQL was compiled with + ) and off. The default value is off. Only superusers and users with the appropriate SET privilege can change this setting. diff --git a/src/backend/replication/logical/reorderbuffer_compression.c b/src/backend/replication/logical/reorderbuffer_compression.c index a05393cc61..9bda286cb8 100644 --- a/src/backend/replication/logical/reorderbuffer_compression.c +++ b/src/backend/replication/logical/reorderbuffer_compression.c @@ -19,6 +19,10 @@ #include #endif +#ifdef USE_ZSTD +#include +#endif + #include "replication/reorderbuffer_compression.h" #define NO_LZ4_SUPPORT() \ @@ -27,6 +31,12 @@ errmsg("compression method lz4 not supported"), \ errdetail("This functionality requires the server to be built with lz4 support."))) +#define NO_ZSTD_SUPPORT() \ + ereport(ERROR, \ + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ + errmsg("compression method zstd not supported"), \ + errdetail("This functionality requires the server to be built with zstd support."))) + /* * Allocate a new LZ4StreamingCompressorState. */ @@ -303,6 +313,309 @@ lz4_DecompressData(char *src, Size src_size, char **dst, Size dst_size) #endif } +/* + * Allocate a new ZSTDStreamingCompressorState. + */ +static void * +zstd_NewCompressorState(MemoryContext context) +{ +#ifndef USE_ZSTD + NO_ZSTD_SUPPORT(); + return NULL; /* keep compiler quiet */ +#else + ZSTDStreamingCompressorState *cstate; + + cstate = (ZSTDStreamingCompressorState *) + MemoryContextAlloc(context, sizeof(ZSTDStreamingCompressorState)); + + /* + * We do not allocate ZSTD buffers and contexts at this point because we + * have no guarantee that we will need them later. Let's allocate only when + * we are about to use them. + */ + cstate->zstd_c_ctx = NULL; + cstate->zstd_c_in_buf = NULL; + cstate->zstd_c_in_buf_size = 0; + cstate->zstd_c_out_buf = NULL; + cstate->zstd_c_out_buf_size = 0; + cstate->zstd_frame_size = 0; + cstate->zstd_d_ctx = NULL; + cstate->zstd_d_in_buf = NULL; + cstate->zstd_d_in_buf_size = 0; + cstate->zstd_d_out_buf = NULL; + cstate->zstd_d_out_buf_size = 0; + + return (void *) cstate; +#endif +} + +/* + * Free ZSTD memory resources and the compressor state. + */ +static void +zstd_FreeCompressorState(MemoryContext context, void *compressor_state) +{ +#ifndef USE_ZSTD + NO_ZSTD_SUPPORT(); +#else + ZSTDStreamingCompressorState *cstate; + MemoryContext oldcontext; + + if (compressor_state == NULL) + return; + + oldcontext = MemoryContextSwitchTo(context); + + cstate = (ZSTDStreamingCompressorState *) compressor_state; + + if (cstate->zstd_c_ctx != NULL) + { + /* Compressor state was used for compression */ + pfree(cstate->zstd_c_in_buf); + pfree(cstate->zstd_c_out_buf); + ZSTD_freeCCtx(cstate->zstd_c_ctx); + } + if (cstate->zstd_d_ctx != NULL) + { + /* Compressor state was used for decompression */ + pfree(cstate->zstd_d_in_buf); + pfree(cstate->zstd_d_out_buf); + ZSTD_freeDCtx(cstate->zstd_d_ctx); + } + + pfree(compressor_state); + + MemoryContextSwitchTo(oldcontext); +#endif +} + +#ifdef USE_ZSTD +/* + * Allocate ZSTD compression buffers and create the ZSTD compression context. + */ +static void +zstd_CreateStreamCompressorState(MemoryContext context, void *compressor_state) +{ + ZSTDStreamingCompressorState *cstate; + MemoryContext oldcontext = MemoryContextSwitchTo(context); + + cstate = (ZSTDStreamingCompressorState *) compressor_state; + cstate->zstd_c_in_buf_size = ZSTD_CStreamInSize(); + cstate->zstd_c_in_buf = (char *) palloc0(cstate->zstd_c_in_buf_size); + cstate->zstd_c_out_buf_size = ZSTD_CStreamOutSize(); + cstate->zstd_c_out_buf = (char *) palloc0(cstate->zstd_c_out_buf_size); + cstate->zstd_c_ctx = ZSTD_createCCtx(); + + if (cstate->zstd_c_ctx == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("could not create ZSTD compression context"))); + + /* Set compression level */ + ZSTD_CCtx_setParameter(cstate->zstd_c_ctx, ZSTD_c_compressionLevel, + ZSTD_COMPRESSION_LEVEL); + + MemoryContextSwitchTo(oldcontext); +} +#endif + +#ifdef USE_ZSTD +/* + * Allocate ZSTD decompression buffers and create the ZSTD decompression + * context. + */ +static void +zstd_CreateStreamDecodeCompressorState(MemoryContext context, void *compressor_state) +{ + ZSTDStreamingCompressorState *cstate; + MemoryContext oldcontext = MemoryContextSwitchTo(context); + + cstate = (ZSTDStreamingCompressorState *) compressor_state; + cstate->zstd_d_in_buf_size = ZSTD_DStreamInSize(); + cstate->zstd_d_in_buf = (char *) palloc0(cstate->zstd_d_in_buf_size); + cstate->zstd_d_out_buf_size = ZSTD_DStreamOutSize(); + cstate->zstd_d_out_buf = (char *) palloc0(cstate->zstd_d_out_buf_size); + cstate->zstd_d_ctx = ZSTD_createDCtx(); + + if (cstate->zstd_d_ctx == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("could not create ZSTD decompression context"))); + + MemoryContextSwitchTo(oldcontext); +} +#endif + +/* + * Data compression using ZSTD streaming API. + */ +static void +zstd_StreamingCompressData(MemoryContext context, char *src, Size src_size, + char **dst, Size *dst_size, void *compressor_state) +{ +#ifndef USE_ZSTD + NO_ZSTD_SUPPORT(); +#else + ZSTDStreamingCompressorState *cstate; + /* Size of remaining data to be copied from src into ZSTD input buffer */ + Size toCpy = src_size; + char *dst_data; + + cstate = (ZSTDStreamingCompressorState *) compressor_state; + /* Allocate ZSTD buffers and context */ + if (cstate->zstd_c_ctx == NULL) + zstd_CreateStreamCompressorState(context, compressor_state); + + /* Allocate memory that will be used to store compressed data */ + *dst = (char *) palloc0(ZSTD_compressBound(src_size)); + + dst_data = *dst; + *dst_size = 0; + + /* + * ZSTD streaming compression works with chunks: the source data needs to + * be splitted out in chunks, each of them is then copied into ZSTD input + * buffer. + * For each chunk, we proceed with compression. Streaming compression is + * not intended to compress the whole input chunk, so we have the call + * ZSTD_compressStream2() multiple times until the entire chunk is + * consumed. + */ + while (toCpy > 0) + { + /* Are we on the last chunk? */ + bool last_chunk = (toCpy < cstate->zstd_c_in_buf_size); + /* Size of the data copied into ZSTD input buffer */ + Size cpySize = last_chunk ? toCpy : cstate->zstd_c_in_buf_size; + bool finished = false; + ZSTD_inBuffer input; + ZSTD_EndDirective mode = last_chunk ? ZSTD_e_flush : ZSTD_e_continue; + + /* Copy data from src into ZSTD input buffer */ + memcpy(cstate->zstd_c_in_buf, src, cpySize); + + /* + * Close the frame when we are on the last chunk and we've reached max + * frame size. + */ + if (last_chunk && (cstate->zstd_frame_size > ZSTD_MAX_FRAME_SIZE)) + { + mode = ZSTD_e_end; + cstate->zstd_frame_size = 0; + } + + cstate->zstd_frame_size += cpySize; + + input.src = cstate->zstd_c_in_buf; + input.size = cpySize; + input.pos = 0; + + do + { + Size remaining; + ZSTD_outBuffer output; + + output.dst = cstate->zstd_c_out_buf; + output.size = cstate->zstd_c_out_buf_size; + output.pos = 0; + + remaining = ZSTD_compressStream2(cstate->zstd_c_ctx, &output, + &input, mode); + + if (ZSTD_isError(remaining)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("ZSTD compression failed"))); + + /* Copy back compressed data from ZSTD output buffer */ + memcpy(dst_data, (char *) cstate->zstd_c_out_buf, output.pos); + + dst_data += output.pos; + *dst_size += output.pos; + + /* + * Compression is done when we are working on the last chunk and + * there is nothing left to compress, or, when we reach the end of + * the chunk. + */ + finished = last_chunk ? (remaining == 0) : (input.pos == input.size); + } while (!finished); + + src += cpySize; + toCpy -= cpySize; + } +#endif +} + +/* + * Data decompression using ZSTD streaming API. + */ +static void +zstd_StreamingDecompressData(MemoryContext context, char *src, Size src_size, + char **dst, Size dst_size, void *compressor_state) +{ +#ifndef USE_ZSTD + NO_ZSTD_SUPPORT(); +#else + ZSTDStreamingCompressorState *cstate; + /* Size of remaining data to be copied from src into ZSTD input buffer */ + Size toCpy = src_size; + char *dst_data; + Size decBytes = 0; /* Size of decompressed data */ + + cstate = (ZSTDStreamingCompressorState *) compressor_state; + /* Allocate ZSTD buffers and context */ + if (cstate->zstd_d_ctx == NULL) + zstd_CreateStreamDecodeCompressorState(context, compressor_state); + + /* Allocate memory that will be used to store decompressed data */ + *dst = (char *) palloc0(dst_size); + + dst_data = *dst; + + while (toCpy > 0) + { + ZSTD_inBuffer input; + Size cpySize = (toCpy > cstate->zstd_d_in_buf_size) ? cstate->zstd_d_in_buf_size : toCpy; + + /* Copy data from src into ZSTD input buffer */ + memcpy(cstate->zstd_d_in_buf, src, cpySize); + + input.src = cstate->zstd_d_in_buf; + input.size = cpySize; + input.pos = 0; + + while (input.pos < input.size) + { + ZSTD_outBuffer output; + Size ret; + + output.dst = cstate->zstd_d_out_buf; + output.size = cstate->zstd_d_out_buf_size; + output.pos = 0; + + ret = ZSTD_decompressStream(cstate->zstd_d_ctx, &output , &input); + + if (ZSTD_isError(ret)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("ZSTD decompression failed"))); + + /* Copy back compressed data from ZSTD output buffer */ + memcpy(dst_data, (char *) cstate->zstd_d_out_buf, output.pos); + + dst_data += output.pos; + decBytes += output.pos; + } + + src += cpySize; + toCpy -= cpySize; + } + + Assert(dst_size == decBytes); +#endif +} + /* * Allocate a new Compressor State, depending on the compression method. */ @@ -314,6 +627,9 @@ ReorderBufferNewCompressorState(MemoryContext context, int compression_method) case REORDER_BUFFER_LZ4_COMPRESSION: return lz4_NewCompressorState(context); break; + case REORDER_BUFFER_ZSTD_COMPRESSION: + return zstd_NewCompressorState(context); + break; case REORDER_BUFFER_NO_COMPRESSION: case REORDER_BUFFER_PGLZ_COMPRESSION: default: @@ -335,6 +651,9 @@ ReorderBufferFreeCompressorState(MemoryContext context, int compression_method, case REORDER_BUFFER_LZ4_COMPRESSION: return lz4_FreeCompressorState(context, compressor_state); break; + case REORDER_BUFFER_ZSTD_COMPRESSION: + return zstd_FreeCompressorState(context, compressor_state); + break; case REORDER_BUFFER_NO_COMPRESSION: case REORDER_BUFFER_PGLZ_COMPRESSION: default: @@ -459,6 +778,35 @@ ReorderBufferCompress(ReorderBuffer *rb, ReorderBufferDiskHeader **header, pfree(dst); + break; + } + /* ZSTD Compression */ + case REORDER_BUFFER_ZSTD_COMPRESSION: + { + char *dst = NULL; + Size dst_size = 0; + char *src = (char *) rb->outbuf + sizeof(ReorderBufferDiskHeader); + Size src_size = data_size - sizeof(ReorderBufferDiskHeader); + + /* Use ZSTD streaming compression */ + zstd_StreamingCompressData(rb->context, src, src_size, &dst, + &dst_size, compressor_state); + + ReorderBufferReserve(rb, (dst_size + sizeof(ReorderBufferDiskHeader))); + + hdr = (ReorderBufferDiskHeader *) rb->outbuf; + hdr->comp_strat = REORDER_BUFFER_STRAT_ZSTD_STREAMING; + hdr->size = dst_size + sizeof(ReorderBufferDiskHeader); + hdr->raw_size = src_size; + + *header = hdr; + + /* Copy back compressed data into the ReorderBuffer */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), dst, + dst_size); + + pfree(dst); + break; } } @@ -553,6 +901,22 @@ ReorderBufferDecompress(ReorderBuffer *rb, char *data, errmsg_internal("compressed PGLZ data is corrupted"))); break; } + /* ZSTD streaming decompression */ + case REORDER_BUFFER_STRAT_ZSTD_STREAMING: + { + char *buf; + Size src_size = header->size - sizeof(ReorderBufferDiskHeader); + Size buf_size = header->raw_size; + + zstd_StreamingDecompressData(rb->context, data, src_size, &buf, + buf_size, compressor_state); + + /* Copy decompressed data into the ReorderBuffer */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), + buf, buf_size); + pfree(buf); + break; + } default: /* Other compression methods not yet supported */ break; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 0209a3a517..16023fb686 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -488,6 +488,9 @@ static const struct config_enum_entry wal_compression_options[] = { static const struct config_enum_entry logical_decoding_spill_compression_options[] = { #ifdef USE_LZ4 {"lz4", REORDER_BUFFER_LZ4_COMPRESSION, false}, +#endif +#ifdef USE_ZSTD + {"zstd", REORDER_BUFFER_ZSTD_COMPRESSION, false}, #endif {"pglz", REORDER_BUFFER_PGLZ_COMPRESSION, false}, {"off", REORDER_BUFFER_NO_COMPRESSION, false}, diff --git a/src/include/replication/reorderbuffer_compression.h b/src/include/replication/reorderbuffer_compression.h index ea77ed1358..d3a847b770 100644 --- a/src/include/replication/reorderbuffer_compression.h +++ b/src/include/replication/reorderbuffer_compression.h @@ -19,6 +19,10 @@ #include #endif +#ifdef USE_ZSTD +#include +#endif + /* GUC support */ extern PGDLLIMPORT int logical_decoding_spill_compression; @@ -28,6 +32,7 @@ typedef enum ReorderBufferCompressionMethod REORDER_BUFFER_NO_COMPRESSION, REORDER_BUFFER_LZ4_COMPRESSION, REORDER_BUFFER_PGLZ_COMPRESSION, + REORDER_BUFFER_ZSTD_COMPRESSION, } ReorderBufferCompressionMethod; /* @@ -39,6 +44,7 @@ typedef enum ReorderBufferCompressionStrategy REORDER_BUFFER_STRAT_LZ4_STREAMING, REORDER_BUFFER_STRAT_LZ4_REGULAR, REORDER_BUFFER_STRAT_PGLZ, + REORDER_BUFFER_STRAT_ZSTD_STREAMING, } ReorderBufferCompressionStrategy; /* Disk serialization support datastructures */ @@ -84,6 +90,39 @@ typedef struct LZ4StreamingCompressorState { #define lz4_CanDoStreamingCompression(s) (false) #endif +#ifdef USE_ZSTD +/* + * Low compression level provides high compression speed and decent compression + * rate. Minimum level is 1, maximum is 22. + */ +#define ZSTD_COMPRESSION_LEVEL 1 + +/* + * Maximum volume of data encoded in the current ZSTD frame. When this + * threshold is reached then we close the current frame and start a new one. + */ +#define ZSTD_MAX_FRAME_SIZE (64 * 1024) + +/* + * ZSTD streaming compression/decompression handlers and buffers. + */ +typedef struct ZSTDStreamingCompressorState { + /* Compression */ + ZSTD_CCtx *zstd_c_ctx; + Size zstd_c_in_buf_size; + char *zstd_c_in_buf; + Size zstd_c_out_buf_size; + char *zstd_c_out_buf; + Size zstd_frame_size; + /* Decompression */ + ZSTD_DCtx *zstd_d_ctx; + Size zstd_d_in_buf_size; + char *zstd_d_in_buf; + Size zstd_d_out_buf_size; + char *zstd_d_out_buf; +} ZSTDStreamingCompressorState; +#endif + extern void *ReorderBufferNewCompressorState(MemoryContext context, int compression_method); extern void ReorderBufferFreeCompressorState(MemoryContext context, -- 2.43.0