From 2423e35294425d2415d456b5ad920e1dea22279d Mon Sep 17 00:00:00 2001 From: Julien Tachoires Date: Thu, 6 Jun 2024 00:57:38 -0700 Subject: [PATCH 1/7] Compress ReorderBuffer spill files using LZ4 When the content of a large transaction (size exceeding logical_decoding_work_mem) and its sub-transactions has to be reordered during logical decoding, then, all the changes are written on disk in temporary files located in pg_replslot/. This behavior happens only when the subscriber's option "streaming" is set to "off", which is the default value. In this case, large transactions decoding by multiple replication slots can lead to disk space saturation and high I/O utilization. When compiled with LZ4 support (--with-lz4), this patch enables data compression/decompression of these temporary files. Each transaction change that must be written on disk is now compressed and wrapped in a new structure named ReorderBufferDiskHeader. 3 different compression strategies are currently implemented: 1. LZ4 streaming compression is the preferred one and works efficiently for small individual changes. 2. LZ4 regular compression when the changes are too large for using LZ4 streaming API. 3. No compression. --- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/meson.build | 1 + .../replication/logical/reorderbuffer.c | 201 ++++--- .../logical/reorderbuffer_compression.c | 502 ++++++++++++++++++ src/include/replication/reorderbuffer.h | 6 + .../replication/reorderbuffer_compression.h | 95 ++++ 6 files changed, 723 insertions(+), 83 deletions(-) create mode 100644 src/backend/replication/logical/reorderbuffer_compression.c create mode 100644 src/include/replication/reorderbuffer_compression.h diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index ba03eeff1c..88bf698a53 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + reorderbuffer_compression.o \ slotsync.o \ snapbuild.o \ tablesync.o \ diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 3dec36a6de..f0dd82bae2 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'reorderbuffer_compression.c', 'slotsync.c', 'snapbuild.c', 'tablesync.c', diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 00a8327e77..4e08167d03 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -102,6 +102,7 @@ #include "pgstat.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/reorderbuffer_compression.h" #include "replication/slot.h" #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" @@ -112,6 +113,13 @@ #include "utils/rel.h" #include "utils/relfilenumbermap.h" +/* GUC */ +#ifdef USE_LZ4 +int logical_decoding_spill_compression = REORDER_BUFFER_LZ4_COMPRESSION; +#else +int logical_decoding_spill_compression = REORDER_BUFFER_NO_COMPRESSION; +#endif + /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt { @@ -173,14 +181,6 @@ typedef struct ReorderBufferToastEnt * main tup */ } ReorderBufferToastEnt; -/* Disk serialization support datastructures */ -typedef struct ReorderBufferDiskChange -{ - Size size; - ReorderBufferChange change; - /* data follows */ -} ReorderBufferDiskChange; - #define IsSpecInsert(action) \ ( \ ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \ @@ -255,6 +255,8 @@ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *tx int fd, ReorderBufferChange *change); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno); +static bool ReorderBufferReadOndiskChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + TXNEntryFile *file, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -427,6 +429,8 @@ ReorderBufferGetTXN(ReorderBuffer *rb) /* InvalidCommandId is not zero, so set it explicitly */ txn->command_id = InvalidCommandId; txn->output_plugin_private = NULL; + txn->compressor_state = ReorderBufferNewCompressorState(rb->context, + logical_decoding_spill_compression); return txn; } @@ -464,6 +468,10 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + ReorderBufferFreeCompressorState(rb->context, + logical_decoding_spill_compression, + txn->compressor_state); + /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); @@ -3776,13 +3784,13 @@ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change) { - ReorderBufferDiskChange *ondisk; - Size sz = sizeof(ReorderBufferDiskChange); + ReorderBufferDiskHeader *disk_hdr; + Size sz = sizeof(ReorderBufferDiskHeader) + sizeof(ReorderBufferChange); ReorderBufferSerializeReserve(rb, sz); - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); + disk_hdr = (ReorderBufferDiskHeader *) rb->outbuf; + memcpy((char *)rb->outbuf + sizeof(ReorderBufferDiskHeader), change, sizeof(ReorderBufferChange)); switch (change->action) { @@ -3818,9 +3826,9 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* make sure we have enough space */ ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskHeader) + sizeof(ReorderBufferChange); /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + disk_hdr = (ReorderBufferDiskHeader *) rb->outbuf; if (oldlen) { @@ -3850,10 +3858,10 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(Size) + sizeof(Size); ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskHeader) + sizeof(ReorderBufferChange); /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + disk_hdr = (ReorderBufferDiskHeader *) rb->outbuf; /* write the prefix including the size */ memcpy(data, &prefix_size, sizeof(Size)); @@ -3880,10 +3888,10 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sz += inval_size; ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskHeader) + sizeof(ReorderBufferChange); /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + disk_hdr = (ReorderBufferDiskHeader *) rb->outbuf; memcpy(data, change->data.inval.invalidations, inval_size); data += inval_size; @@ -3902,9 +3910,9 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* make sure we have enough space */ ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskHeader) + sizeof(ReorderBufferChange); /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + disk_hdr = (ReorderBufferDiskHeader *) rb->outbuf; memcpy(data, snap, sizeof(SnapshotData)); data += sizeof(SnapshotData); @@ -3936,9 +3944,9 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* make sure we have enough space */ ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskHeader) + sizeof(ReorderBufferChange); /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + disk_hdr = (ReorderBufferDiskHeader *) rb->outbuf; memcpy(data, change->data.truncate.relids, size); data += size; @@ -3953,11 +3961,14 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } - ondisk->size = sz; + /* Inplace ReorderBuffer content compression before writing it on disk */ + ReorderBufferCompress(rb, &disk_hdr, logical_decoding_spill_compression, + sz, txn->compressor_state); errno = 0; pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); - if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) + + if (write(fd, rb->outbuf, disk_hdr->size) != disk_hdr->size) { int save_errno = errno; @@ -3982,8 +3993,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (txn->final_lsn < change->lsn) txn->final_lsn = change->lsn; - - Assert(ondisk->change.action == change->action); } /* Returns true, if the output plugin supports streaming, false, otherwise. */ @@ -4252,9 +4261,6 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, while (restored < max_changes_in_memory && *segno <= last_segno) { - int readBytes; - ReorderBufferDiskChange *ondisk; - CHECK_FOR_INTERRUPTS(); if (*fd == -1) @@ -4293,60 +4299,15 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* - * Read the statically sized part of a change which has information - * about the total size. If we couldn't read a record, we're at the - * end of this file. + * Read the full change from disk. + * If ReorderBufferReadOndiskChange returns false, then we are at the + * eof, so, move the next segment. */ - ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - readBytes = FileRead(file->vfd, rb->outbuf, - sizeof(ReorderBufferDiskChange), - file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ); - - /* eof */ - if (readBytes == 0) + if (!ReorderBufferReadOndiskChange(rb, txn, file, segno)) { - FileClose(*fd); *fd = -1; - (*segno)++; continue; } - else if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != sizeof(ReorderBufferDiskChange)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", - readBytes, - (uint32) sizeof(ReorderBufferDiskChange)))); - - file->curOffset += readBytes; - - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - ReorderBufferSerializeReserve(rb, - sizeof(ReorderBufferDiskChange) + ondisk->size); - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - readBytes = FileRead(file->vfd, - rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange), - file->curOffset, - WAIT_EVENT_REORDER_BUFFER_READ); - - if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", - readBytes, - (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); - - file->curOffset += readBytes; /* * ok, read a full change from disk, now restore it into proper @@ -4359,6 +4320,83 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, return restored; } +/* + * Read a change spilled to disk and decompress it if compressed. + */ +static bool +ReorderBufferReadOndiskChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + TXNEntryFile *file, XLogSegNo *segno) +{ + int readBytes; + ReorderBufferDiskHeader *disk_hdr; + char *header; /* disk header buffer*/ + char *data; /* data buffer */ + + /* + * Read the statically sized part of a change which has information about + * the total size and compression method. If we couldn't read a record, + * we're at the end of this file. + */ + header = (char *) palloc0(sizeof(ReorderBufferDiskHeader)); + readBytes = FileRead(file->vfd, header, + sizeof(ReorderBufferDiskHeader), + file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ); + + /* eof */ + if (readBytes == 0) + { + + FileClose(file->vfd); + (*segno)++; + pfree(header); + + return false; + } + else if (readBytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: %m"))); + else if (readBytes != sizeof(ReorderBufferDiskHeader)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", + readBytes, + (uint32) sizeof(ReorderBufferDiskHeader)))); + + file->curOffset += readBytes; + + disk_hdr = (ReorderBufferDiskHeader *) header; + + /* Read ondisk data */ + data = (char *) palloc0(disk_hdr->size - sizeof(ReorderBufferDiskHeader)); + readBytes = FileRead(file->vfd, + data, + disk_hdr->size - sizeof(ReorderBufferDiskHeader), + file->curOffset, + WAIT_EVENT_REORDER_BUFFER_READ); + + if (readBytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: %m"))); + else if (readBytes != (disk_hdr->size - sizeof(ReorderBufferDiskHeader))) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", + readBytes, + (uint32) (disk_hdr->size - sizeof(ReorderBufferDiskHeader))))); + + /* Decompress data */ + ReorderBufferDecompress(rb, data, disk_hdr, txn->compressor_state); + + pfree(data); + pfree(header); + + file->curOffset += readBytes; + + return true; +} + /* * Convert change from its on-disk format to in-memory format and queue it onto * the TXN's ->changes list. @@ -4371,17 +4409,14 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data) { - ReorderBufferDiskChange *ondisk; ReorderBufferChange *change; - ondisk = (ReorderBufferDiskChange *) data; - change = ReorderBufferGetChange(rb); /* copy static part */ - memcpy(change, &ondisk->change, sizeof(ReorderBufferChange)); + memcpy(change, data + sizeof(ReorderBufferDiskHeader), sizeof(ReorderBufferChange)); - data += sizeof(ReorderBufferDiskChange); + data += sizeof(ReorderBufferDiskHeader) + sizeof(ReorderBufferChange); /* restore individual stuff */ switch (change->action) diff --git a/src/backend/replication/logical/reorderbuffer_compression.c b/src/backend/replication/logical/reorderbuffer_compression.c new file mode 100644 index 0000000000..77f5c76929 --- /dev/null +++ b/src/backend/replication/logical/reorderbuffer_compression.c @@ -0,0 +1,502 @@ +/*------------------------------------------------------------------------- + * + * reorderbuffer_compression.c + * Functions for ReorderBuffer compression. + * + * Copyright (c) 2024-2024, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/common/reorderbuffer_compression.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#ifdef USE_LZ4 +#include +#endif + +#include "replication/reorderbuffer_compression.h" + +#define NO_LZ4_SUPPORT() \ + ereport(ERROR, \ + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ + errmsg("compression method lz4 not supported"), \ + errdetail("This functionality requires the server to be built with lz4 support."))) + +/* + * Allocate a new LZ4StreamingCompressorState. + */ +static void * +lz4_NewCompressorState(MemoryContext context) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); + return NULL; /* keep compiler quiet */ +#else + LZ4StreamingCompressorState *cstate; + + cstate = (LZ4StreamingCompressorState *) + MemoryContextAlloc(context, sizeof(LZ4StreamingCompressorState)); + + /* + * We do not allocate LZ4 ring buffers and streaming handlers at this + * point because we have no guarantee that we will need them later. Let's + * allocate only when we are about to use them. + */ + cstate->lz4_in_buf = NULL; + cstate->lz4_out_buf = NULL; + cstate->lz4_in_buf_offset = 0; + cstate->lz4_out_buf_offset = 0; + cstate->lz4_stream = NULL; + cstate->lz4_stream_decode = NULL; + + return (void *) cstate; +#endif +} + +/* + * Free LZ4 memory resources and the compressor state. + */ +static void +lz4_FreeCompressorState(MemoryContext context, void *compressor_state) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); +#else + LZ4StreamingCompressorState *cstate; + MemoryContext oldcontext; + + if (compressor_state == NULL) + return; + + oldcontext = MemoryContextSwitchTo(context); + + cstate = (LZ4StreamingCompressorState *) compressor_state; + + if (cstate->lz4_in_buf != NULL) + { + pfree(cstate->lz4_in_buf); + LZ4_freeStream(cstate->lz4_stream); + } + if (cstate->lz4_out_buf != NULL) + { + pfree(cstate->lz4_out_buf); + LZ4_freeStreamDecode(cstate->lz4_stream_decode); + } + + pfree(compressor_state); + + MemoryContextSwitchTo(oldcontext); +#endif +} + +#ifdef USE_LZ4 +/* + * Allocate LZ4 input ring buffer and create the streaming compression handler. + */ +static void +lz4_CreateStreamCompressorState(MemoryContext context, void *compressor_state) +{ + LZ4StreamingCompressorState *cstate; + MemoryContext oldcontext = MemoryContextSwitchTo(context); + + cstate = (LZ4StreamingCompressorState *) compressor_state; + cstate->lz4_in_buf = (char *) palloc0(LZ4_RING_BUFFER_SIZE); + cstate->lz4_stream = LZ4_createStream(); + + MemoryContextSwitchTo(oldcontext); +} +#endif + +#ifdef USE_LZ4 +/* + * Allocate LZ4 output ring buffer and create the streaming decompression + * handler. + */ +static void +lz4_CreateStreamDecodeCompressorState(MemoryContext context, + void *compressor_state) +{ + LZ4StreamingCompressorState *cstate; + MemoryContext oldcontext = MemoryContextSwitchTo(context); + + cstate = (LZ4StreamingCompressorState *) compressor_state; + cstate->lz4_out_buf = (char *) palloc0(LZ4_RING_BUFFER_SIZE); + cstate->lz4_stream_decode = LZ4_createStreamDecode(); + + MemoryContextSwitchTo(oldcontext); +} +#endif + +/* + * Data compression using LZ4 streaming API. + * Caller must ensure that the source data can fit in LZ4 input ring buffer, + * this checking must be done by lz4_CanDoStreamingCompression(). + */ +static void +lz4_StreamingCompressData(MemoryContext context, char *src, Size src_size, + char **dst, Size *dst_size, void *compressor_state) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); +#else + LZ4StreamingCompressorState *cstate; + int lz4_cmp_size = 0; /* compressed size */ + char *buf; /* buffer used for compression */ + Size buf_size; /* buffer size */ + char *lz4_in_bufPtr; /* input ring buffer pointer */ + + cstate = (LZ4StreamingCompressorState *) compressor_state; + + /* Allocate LZ4 input ring buffer and streaming compression handler */ + if (cstate->lz4_in_buf == NULL) + lz4_CreateStreamCompressorState(context, compressor_state); + + /* Ring buffer offset wraparound */ + if ((cstate->lz4_in_buf_offset + src_size) > LZ4_RING_BUFFER_SIZE) + cstate->lz4_in_buf_offset = 0; + + /* Get the pointer of the next entry in the ring buffer */ + lz4_in_bufPtr = cstate->lz4_in_buf + cstate->lz4_in_buf_offset; + + /* Copy data that should be compressed into LZ4 input ring buffer */ + memcpy(lz4_in_bufPtr, src, src_size); + + /* Allocate space for compressed data */ + buf_size = LZ4_COMPRESSBOUND(src_size); + buf = (char *) palloc0(buf_size); + + /* Use LZ4 streaming compression API */ + lz4_cmp_size = LZ4_compress_fast_continue(cstate->lz4_stream, + lz4_in_bufPtr, buf, src_size, + buf_size, 1); + + if (lz4_cmp_size <= 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("LZ4 compression failed"))); + + /* Move the input ring buffer offset */ + cstate->lz4_in_buf_offset += src_size; + + *dst_size = lz4_cmp_size; + *dst = buf; +#endif +} + +/* + * Data compression using LZ4 API. + */ +static void +lz4_CompressData(char *src, Size src_size, char **dst, Size *dst_size) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); +#else + int lz4_cmp_size = 0; /* compressed size */ + char *buf; /* buffer used for compression */ + Size buf_size; /* buffer size */ + + buf_size = LZ4_COMPRESSBOUND(src_size); + buf = (char *) palloc0(buf_size); + + /* Use LZ4 regular compression API */ + lz4_cmp_size = LZ4_compress_default(src, buf, src_size, buf_size); + + if (lz4_cmp_size <= 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("LZ4 compression failed"))); + + *dst_size = lz4_cmp_size; + *dst = buf; +#endif +} + +/* + * Data decompression using LZ4 streaming API. + * LZ4 decompression uses the output ring buffer to store decompressed data, + * thus, we don't need to create a new buffer. We return the pointer to data + * location. + */ +static void +lz4_StreamingDecompressData(MemoryContext context, char *src, Size src_size, + char **dst, Size dst_size, void *compressor_state) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); +#else + LZ4StreamingCompressorState *cstate; + char *lz4_out_bufPtr; /* output ring buffer pointer */ + int lz4_dec_size; /* decompressed data size */ + + cstate = (LZ4StreamingCompressorState *) compressor_state; + + /* Allocate LZ4 output ring buffer and streaming decompression handler */ + if (cstate->lz4_out_buf == NULL) + lz4_CreateStreamDecodeCompressorState(context, compressor_state); + + /* Ring buffer offset wraparound */ + if ((cstate->lz4_out_buf_offset + dst_size) > LZ4_RING_BUFFER_SIZE) + cstate->lz4_out_buf_offset = 0; + + /* Get current entry pointer in the ring buffer */ + lz4_out_bufPtr = cstate->lz4_out_buf + cstate->lz4_out_buf_offset; + + lz4_dec_size = LZ4_decompress_safe_continue(cstate->lz4_stream_decode, + src, + lz4_out_bufPtr, + src_size, + dst_size); + + Assert(lz4_dec_size == dst_size); + + if (lz4_dec_size < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed LZ4 data is corrupted"))); + else if (lz4_dec_size != dst_size) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("decompressed LZ4 data size differs from original size"))); + + /* Move the output ring buffer offset */ + cstate->lz4_out_buf_offset += lz4_dec_size; + + /* Point to the decompressed data location */ + *dst = lz4_out_bufPtr; +#endif +} + +/* + * Data decompression using LZ4 API. + */ +static void +lz4_DecompressData(char *src, Size src_size, char **dst, Size dst_size) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); +#else + int lz4_dec_bytes; + char *buf; + + buf = (char *) palloc0(dst_size); + + lz4_dec_bytes = LZ4_decompress_safe(src, buf, src_size, dst_size); + + Assert(lz4_dec_bytes == dst_size); + + if (lz4_dec_bytes < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed LZ4 data is corrupted"))); + else if (lz4_dec_bytes != dst_size) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("decompressed LZ4 data size differs from original size"))); + + *dst = buf; +#endif +} + +/* + * Allocate a new Compressor State, depending on the compression method. + */ +void * +ReorderBufferNewCompressorState(MemoryContext context, int compression_method) +{ + switch (compression_method) + { + case REORDER_BUFFER_LZ4_COMPRESSION: + return lz4_NewCompressorState(context); + break; + case REORDER_BUFFER_NO_COMPRESSION: + default: + return NULL; + break; + } +} + +/* + * Free memory allocated to a Compressor State, depending on the compression + * method. + */ +void +ReorderBufferFreeCompressorState(MemoryContext context, int compression_method, + void *compressor_state) +{ + switch (compression_method) + { + case REORDER_BUFFER_LZ4_COMPRESSION: + return lz4_FreeCompressorState(context, compressor_state); + break; + case REORDER_BUFFER_NO_COMPRESSION: + default: + break; + } +} + +/* + * Ensure the IO buffer is >= sz. + */ +static void +ReorderBufferReserve(ReorderBuffer *rb, Size sz) +{ + if (rb->outbufsize < sz) + { + rb->outbuf = repalloc(rb->outbuf, sz); + rb->outbufsize = sz; + } +} + +/* + * Compress ReorderBuffer content. This function is called in order to compress + * data before spilling on disk. + */ +void +ReorderBufferCompress(ReorderBuffer *rb, ReorderBufferDiskHeader **header, + int compression_method, Size data_size, + void *compressor_state) +{ + ReorderBufferDiskHeader *hdr = *header; + + switch (compression_method) + { + /* No compression */ + case REORDER_BUFFER_NO_COMPRESSION: + { + hdr->comp_strat = REORDER_BUFFER_STRAT_UNCOMPRESSED; + hdr->size = data_size; + hdr->raw_size = data_size - sizeof(ReorderBufferDiskHeader); + + break; + } + /* LZ4 Compression */ + case REORDER_BUFFER_LZ4_COMPRESSION: + { + char *dst = NULL; + Size dst_size = 0; + char *src = (char *) rb->outbuf + sizeof(ReorderBufferDiskHeader); + Size src_size = data_size - sizeof(ReorderBufferDiskHeader); + ReorderBufferCompressionStrategy strat; + + if (lz4_CanDoStreamingCompression(src_size)) + { + /* Use LZ4 streaming compression if possible */ + lz4_StreamingCompressData(rb->context, src, src_size, &dst, + &dst_size, compressor_state); + strat = REORDER_BUFFER_STRAT_LZ4_STREAMING; + } + else + { + /* Fallback to LZ4 regular compression */ + lz4_CompressData(src, src_size, &dst, &dst_size); + strat = REORDER_BUFFER_STRAT_LZ4_REGULAR; + } + + /* + * Make sure the ReorderBuffer has enough space to store compressed + * data. Compressed data must be smaller than raw data, so, the + * ReorderBuffer should already have room for compressed data, but + * we do this to avoid buffer overflow risks. + */ + ReorderBufferReserve(rb, (dst_size + sizeof(ReorderBufferDiskHeader))); + + hdr = (ReorderBufferDiskHeader *) rb->outbuf; + hdr->comp_strat = strat; + hdr->size = dst_size + sizeof(ReorderBufferDiskHeader); + hdr->raw_size = src_size; + + /* + * Update header: hdr pointer has potentially changed due to + * ReorderBufferReserve() + */ + *header = hdr; + + /* Copy back compressed data into the ReorderBuffer */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), dst, + dst_size); + + pfree(dst); + + break; + } + } +} + +/* + * Decompress data read from disk and copy it into the ReorderBuffer. + */ +void +ReorderBufferDecompress(ReorderBuffer *rb, char *data, + ReorderBufferDiskHeader *header, void *compressor_state) +{ + Size raw_outbufsize = header->raw_size + sizeof(ReorderBufferDiskHeader); + /* + * Make sure the output reorder buffer has enough space to store + * decompressed/raw data. + */ + if (rb->outbufsize < raw_outbufsize) + { + rb->outbuf = repalloc(rb->outbuf, raw_outbufsize); + rb->outbufsize = raw_outbufsize; + } + + /* Make a copy of the header read on disk into the ReorderBuffer */ + memcpy(rb->outbuf, (char *) header, sizeof(ReorderBufferDiskHeader)); + + switch (header->comp_strat) + { + /* No decompression */ + case REORDER_BUFFER_STRAT_UNCOMPRESSED: + { + /* + * Make a copy of what was read on disk into the reorder + * buffer. + */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), + data, header->raw_size); + break; + } + /* LZ4 regular decompression */ + case REORDER_BUFFER_STRAT_LZ4_REGULAR: + { + char *buf; + Size src_size = header->size - sizeof(ReorderBufferDiskHeader); + Size buf_size = header->raw_size; + + lz4_DecompressData(data, src_size, &buf, buf_size); + + /* Copy decompressed data into the ReorderBuffer */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), + buf, buf_size); + + pfree(buf); + break; + } + /* LZ4 streaming decompression */ + case REORDER_BUFFER_STRAT_LZ4_STREAMING: + { + char *buf; + Size src_size = header->size - sizeof(ReorderBufferDiskHeader); + Size buf_size = header->raw_size; + + lz4_StreamingDecompressData(rb->context, data, src_size, &buf, + buf_size, compressor_state); + + /* Copy decompressed data into the ReorderBuffer */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), + buf, buf_size); + /* + * Not necessary to free buf in this case: it points to the + * decompressed data stored in LZ4 output ring buffer. + */ + break; + } + default: + /* Other compression methods not yet supported */ + break; + } +} diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 851a001c8b..bf979e0b14 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -9,6 +9,10 @@ #ifndef REORDERBUFFER_H #define REORDERBUFFER_H +#ifdef USE_LZ4 +#include +#endif + #include "access/htup_details.h" #include "lib/ilist.h" #include "lib/pairingheap.h" @@ -422,6 +426,8 @@ typedef struct ReorderBufferTXN * Private data pointer of the output plugin. */ void *output_plugin_private; + + void *compressor_state; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ diff --git a/src/include/replication/reorderbuffer_compression.h b/src/include/replication/reorderbuffer_compression.h new file mode 100644 index 0000000000..9aa8aea56f --- /dev/null +++ b/src/include/replication/reorderbuffer_compression.h @@ -0,0 +1,95 @@ +/*------------------------------------------------------------------------- + * + * reorderbuffer_compression.h + * Functions for ReorderBuffer compression. + * + * Copyright (c) 2024-2024, PostgreSQL Global Development Group + * + * src/include/access/reorderbuffer_compression.h + * + *------------------------------------------------------------------------- + */ + +#ifndef REORDERBUFFER_COMPRESSION_H +#define REORDERBUFFER_COMPRESSION_H + +#include "replication/reorderbuffer.h" + +#ifdef USE_LZ4 +#include +#endif + +/* ReorderBuffer on disk compression algorithms */ +typedef enum ReorderBufferCompressionMethod +{ + REORDER_BUFFER_NO_COMPRESSION, + REORDER_BUFFER_LZ4_COMPRESSION, +} ReorderBufferCompressionMethod; + +/* + * Compression strategy applied to ReorderBuffer records spilled on disk + */ +typedef enum ReorderBufferCompressionStrategy +{ + REORDER_BUFFER_STRAT_UNCOMPRESSED, + REORDER_BUFFER_STRAT_LZ4_STREAMING, + REORDER_BUFFER_STRAT_LZ4_REGULAR, +} ReorderBufferCompressionStrategy; + +/* Disk serialization support datastructures */ +typedef struct ReorderBufferDiskHeader +{ + ReorderBufferCompressionStrategy comp_strat; /* Compression strategy */ + Size size; /* Ondisk size */ + Size raw_size; /* Raw/uncompressed data size */ + /* ReorderBufferChange + data follows */ +} ReorderBufferDiskHeader; + +#ifdef USE_LZ4 +/* + * We use a fairly small LZ4 ring buffer size (64kB). Using a larger buffer + * size provide better compression ratio, but as long as we have to allocate + * two LZ4 ring buffers per ReorderBufferTXN, we should keep it small. + */ +#define LZ4_RING_BUFFER_SIZE (64 * 1024) + +/* + * Use LZ4 streaming compression iff we can keep at least 2 uncompressed + * records into the LZ4 input ring buffer. If raw data size is too large, let's + * use regular LZ4 compression. + */ +#define lz4_CanDoStreamingCompression(s) (s < (LZ4_RING_BUFFER_SIZE / 2)) + +/* + * LZ4 streaming compression/decompression handlers and ring + * buffers. + */ +typedef struct LZ4StreamingCompressorState { + /* Streaming compression handler */ + LZ4_stream_t *lz4_stream; + /* Streaming decompression handler */ + LZ4_streamDecode_t *lz4_stream_decode; + /* LZ4 in/out ring buffers used for streaming compression */ + char *lz4_in_buf; + int lz4_in_buf_offset; + char *lz4_out_buf; + int lz4_out_buf_offset; +} LZ4StreamingCompressorState; +#else +#define lz4_CanDoStreamingCompression(s) (false) +#endif + +extern void *ReorderBufferNewCompressorState(MemoryContext context, + int compression_method); +extern void ReorderBufferFreeCompressorState(MemoryContext context, + int compression_method, + void *compressor_state); +extern void ReorderBufferCompress(ReorderBuffer *rb, + ReorderBufferDiskHeader **header, + int compression_method, Size data_size, + void *compressor_state); +extern void ReorderBufferDecompress(ReorderBuffer *rb, char *data, + ReorderBufferDiskHeader *header, + void *compressor_state); + +#endif /* REORDERBUFFER_COMPRESSION_H */ -- 2.43.0