From ef6ad37d547e158f11b4fc73439ebcd9c9bd253f Mon Sep 17 00:00:00 2001 From: Julien Tachoires Date: Thu, 18 Jul 2024 07:36:04 -0700 Subject: [PATCH 3/6] Compress ReorderBuffer spill files using PGLZ --- .../logical/reorderbuffer_compression.c | 58 +++++++++++++++++++ .../replication/reorderbuffer_compression.h | 2 + 2 files changed, 60 insertions(+) diff --git a/src/backend/replication/logical/reorderbuffer_compression.c b/src/backend/replication/logical/reorderbuffer_compression.c index 77f5c76929..a05393cc61 100644 --- a/src/backend/replication/logical/reorderbuffer_compression.c +++ b/src/backend/replication/logical/reorderbuffer_compression.c @@ -13,6 +13,8 @@ */ #include "postgres.h" +#include "common/pg_lzcompress.h" + #ifdef USE_LZ4 #include #endif @@ -313,6 +315,7 @@ ReorderBufferNewCompressorState(MemoryContext context, int compression_method) return lz4_NewCompressorState(context); break; case REORDER_BUFFER_NO_COMPRESSION: + case REORDER_BUFFER_PGLZ_COMPRESSION: default: return NULL; break; @@ -333,6 +336,7 @@ ReorderBufferFreeCompressorState(MemoryContext context, int compression_method, return lz4_FreeCompressorState(context, compressor_state); break; case REORDER_BUFFER_NO_COMPRESSION: + case REORDER_BUFFER_PGLZ_COMPRESSION: default: break; } @@ -421,6 +425,40 @@ ReorderBufferCompress(ReorderBuffer *rb, ReorderBufferDiskHeader **header, pfree(dst); + break; + } + /* PGLZ compression */ + case REORDER_BUFFER_PGLZ_COMPRESSION: + { + int32 dst_size = 0; + char *dst = NULL; + char *src = (char *) rb->outbuf + sizeof(ReorderBufferDiskHeader); + int32 src_size = data_size - sizeof(ReorderBufferDiskHeader); + int32 max_size = PGLZ_MAX_OUTPUT(src_size); + + dst = (char *) palloc0(max_size); + dst_size = pglz_compress(src, src_size, dst, PGLZ_strategy_always); + + if (dst_size < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("PGLZ compression failed"))); + + ReorderBufferReserve(rb, (Size) (dst_size + sizeof(ReorderBufferDiskHeader))); + + hdr = (ReorderBufferDiskHeader *) rb->outbuf; + hdr->comp_strat = REORDER_BUFFER_STRAT_PGLZ; + hdr->size = (Size) dst_size + sizeof(ReorderBufferDiskHeader); + hdr->raw_size = (Size) src_size; + + *header = hdr; + + /* Copy back compressed data into the ReorderBuffer */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), dst, + dst_size); + + pfree(dst); + break; } } @@ -495,6 +533,26 @@ ReorderBufferDecompress(ReorderBuffer *rb, char *data, */ break; } + /* PGLZ decompression */ + case REORDER_BUFFER_STRAT_PGLZ: + { + char *buf; + int32 src_size = (int32) header->size - sizeof(ReorderBufferDiskHeader); + int32 buf_size = (int32) header->raw_size; + int32 decBytes; + + /* Decompress data directly into the ReorderBuffer */ + buf = (char *) rb->outbuf; + buf += sizeof(ReorderBufferDiskHeader); + + decBytes = pglz_decompress(data, src_size, buf, buf_size, false); + + if (decBytes < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed PGLZ data is corrupted"))); + break; + } default: /* Other compression methods not yet supported */ break; diff --git a/src/include/replication/reorderbuffer_compression.h b/src/include/replication/reorderbuffer_compression.h index 9aa8aea56f..5abc875ac4 100644 --- a/src/include/replication/reorderbuffer_compression.h +++ b/src/include/replication/reorderbuffer_compression.h @@ -24,6 +24,7 @@ typedef enum ReorderBufferCompressionMethod { REORDER_BUFFER_NO_COMPRESSION, REORDER_BUFFER_LZ4_COMPRESSION, + REORDER_BUFFER_PGLZ_COMPRESSION, } ReorderBufferCompressionMethod; /* @@ -34,6 +35,7 @@ typedef enum ReorderBufferCompressionStrategy REORDER_BUFFER_STRAT_UNCOMPRESSED, REORDER_BUFFER_STRAT_LZ4_STREAMING, REORDER_BUFFER_STRAT_LZ4_REGULAR, + REORDER_BUFFER_STRAT_PGLZ, } ReorderBufferCompressionStrategy; /* Disk serialization support datastructures */ -- 2.43.0