From 5de30bbc5df026aba7e0349d1186c6e34d8d41c9 Mon Sep 17 00:00:00 2001 From: Rahila Syed Date: Fri, 4 Jul 2014 18:48:17 +0530 Subject: [PATCH 2/2] CompressBackupBlock_snappy_lz4_pglz-2 --- src/backend/access/transam/xlog.c | 157 ++++++++++++++++++++++++++++++++++++ src/backend/utils/misc/guc.c | 24 ++++++ src/include/access/xlog.h | 9 ++ src/include/access/xlog_internal.h | 2 +- 4 files changed, 191 insertions(+), 1 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3f92482..39635de 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -60,6 +60,9 @@ #include "storage/spin.h" #include "utils/builtins.h" #include "utils/guc.h" +#include "utils/pg_lzcompress.h" +#include "utils/pg_snappy.h" +#include "utils/pg_lz4.h" #include "utils/ps_status.h" #include "utils/relmapper.h" #include "utils/snapmgr.h" @@ -84,6 +87,7 @@ bool XLogArchiveMode = false; char *XLogArchiveCommand = NULL; bool EnableHotStandby = false; bool fullPageWrites = true; +int compress_backup_block = false; bool wal_log_hints = false; bool log_checkpoints = false; int sync_method = DEFAULT_SYNC_METHOD; @@ -808,6 +812,7 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); +static char *CompressBackupBlock(char *page, uint32 orig_len, char *dest, uint32 *len); static char *GetXLogBuffer(XLogRecPtr ptr); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); @@ -977,6 +982,34 @@ begin:; if (len == 0 && !isLogSwitch) elog(PANIC, "invalid xlog record length %u", len); + /* Allocates memory for compressed backup blocks according to the compression + * algorithm used.Once per session at the time of insertion of first XLOG + * record. + * This memory stays till the end of session. OOM is handled by making the + * code proceed without FPW compression*/ + static char *compressed_pages[XLR_MAX_BKP_BLOCKS]; + static bool compressed_pages_allocated = false; + if (compress_backup_block != BACKUP_BLOCK_COMPRESSION_OFF && + compressed_pages_allocated!= true) + { + size_t buffer_size = VARHDRSZ; + int j; + if (compress_backup_block == BACKUP_BLOCK_COMPRESSION_SNAPPY) + buffer_size += snappy_max_compressed_length(BLCKSZ); + else if (compress_backup_block == BACKUP_BLOCK_COMPRESSION_LZ4) + buffer_size += LZ4_compressBound(BLCKSZ); + else if (compress_backup_block == BACKUP_BLOCK_COMPRESSION_PGLZ) + buffer_size += PGLZ_MAX_OUTPUT(BLCKSZ); + for (j = 0; j < XLR_MAX_BKP_BLOCKS; j++) + { compressed_pages[j] = (char *) malloc(buffer_size); + if(compressed_pages[j] == NULL) + { + compress_backup_block=BACKUP_BLOCK_COMPRESSION_OFF; + break; + } + } + compressed_pages_allocated = true; + } /* * Make additional rdata chain entries for the backup blocks, so that we * don't need to special-case them in the write loop. This modifies the @@ -1015,11 +1048,32 @@ begin:; rdt->next = &(dtbuf_rdt2[i]); rdt = rdt->next; + if (compress_backup_block != BACKUP_BLOCK_COMPRESSION_OFF) + { + /* Compress the backup block before including it in rdata chain */ + rdt->data = CompressBackupBlock(page, BLCKSZ - bkpb->hole_length, + compressed_pages[i], &(rdt->len)); + if (rdt->data != NULL) + { + /* + * write_len is the length of compressed block and its varlena + * header + */ + write_len += rdt->len; + bkpb->hole_length = BLCKSZ - rdt->len; + /*Adding information about compression in the backup block header*/ + bkpb->block_compression=compress_backup_block; + rdt->next = NULL; + continue; + } + } + if (bkpb->hole_length == 0) { rdt->data = page; rdt->len = BLCKSZ; write_len += BLCKSZ; + bkpb->block_compression=BACKUP_BLOCK_COMPRESSION_OFF; rdt->next = NULL; } else @@ -1035,6 +1089,7 @@ begin:; rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); write_len += rdt->len; + bkpb->block_compression=BACKUP_BLOCK_COMPRESSION_OFF; rdt->next = NULL; } } @@ -1766,6 +1821,64 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) } /* + * Create a compressed version of a backup block + * + * If successful, return a compressed result and set 'len' to its length. + * Otherwise (ie, compressed result is actually bigger than original), + * return NULL. + */ +static char * +CompressBackupBlock(char *page, uint32 orig_len, char *dest, uint32 *len) +{ + struct varlena *buf = (struct varlena *) dest; + + if (compress_backup_block == BACKUP_BLOCK_COMPRESSION_SNAPPY) + { + int ret; + ret = pg_snappy_compress(page,BLCKSZ,buf); + /* EIO is returned for incompressible data */ + if (ret == EIO ) + return NULL; + } + else if (compress_backup_block == BACKUP_BLOCK_COMPRESSION_LZ4) + { + int ret; + ret = pg_LZ4_compress(page,BLCKSZ,buf); + if (ret == 0 ) + return NULL; + } + else if (compress_backup_block == BACKUP_BLOCK_COMPRESSION_PGLZ) + { + bool ret; + ret = pglz_compress(page, BLCKSZ, + (PGLZ_Header *) buf, PGLZ_strategy_default); + if(!ret) + return NULL; + } + else + elog(ERROR, "Wrong value for compress_backup_block GUC"); + + /* + * We recheck the actual size even if pglz_compress() reports success, + * because it might be satisfied with having saved as little as one byte + * in the compressed data --- which could turn into a net loss once you + * consider header and alignment padding. Worst case, the compressed + * format might require three padding bytes (plus header, which is + * included in VARSIZE(buf)), whereas the uncompressed format would take + * only one header byte and no padding if the value is short enough. So + * we insist on a savings of more than 2 bytes to ensure we have a gain. + */ + if(VARSIZE(buf) < orig_len-2) + /* successful compression */ + { + *len = VARSIZE(buf); + return (char *) buf; + } + else + return NULL; +} + +/* * Get a pointer to the right location in the WAL buffer containing the * given XLogRecPtr. * @@ -4061,6 +4174,50 @@ RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk, { memcpy((char *) page, blk, BLCKSZ); } + /* Decompress if backup block is compressed*/ + else if (VARATT_IS_COMPRESSED((struct varlena *) blk) + && bkpb.block_compression!=BACKUP_BLOCK_COMPRESSION_OFF) + { + if (bkpb.block_compression == BACKUP_BLOCK_COMPRESSION_SNAPPY) + { + int ret; + size_t compressed_length = VARSIZE((struct varlena *) blk) - VARHDRSZ; + char *compressed_data = (char *)VARDATA((struct varlena *) blk); + size_t s_uncompressed_length; + + ret = snappy_uncompressed_length(compressed_data, + compressed_length, + &s_uncompressed_length); + if (!ret) + elog(ERROR, "snappy: failed to determine compression length"); + if (BLCKSZ != s_uncompressed_length) + elog(ERROR, "snappy: compression size mismatch %d != %zu", + BLCKSZ, s_uncompressed_length); + + ret = snappy_uncompress(compressed_data, + compressed_length, + page); + if (ret != 0) + elog(ERROR, "snappy: decompression failed: %d", ret); + } + else if (bkpb.block_compression == BACKUP_BLOCK_COMPRESSION_LZ4) + { + int ret; + size_t compressed_length = VARSIZE((struct varlena *) blk) - VARHDRSZ; + char *compressed_data = (char *)VARDATA((struct varlena *) blk); + ret = LZ4_decompress_fast(compressed_data, page, + BLCKSZ); + if (ret != compressed_length) + elog(ERROR, "lz4: decompression size mismatch: %d vs %zu", ret, + compressed_length); + } + else if (bkpb.block_compression == BACKUP_BLOCK_COMPRESSION_PGLZ) + { + pglz_decompress((PGLZ_Header *) blk, (char *) page); + } + else + elog(ERROR, "Wrong value for compress_backup_block GUC"); + } else { memcpy((char *) page, blk, bkpb.hole_offset); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 1d094f0..a068e49 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -403,6 +403,20 @@ static const struct config_enum_entry huge_pages_options[] = { {NULL, 0, false} }; + +/* + */ +static const struct config_enum_entry backup_block_compression_options[] = { + {"off", BACKUP_BLOCK_COMPRESSION_OFF, false}, + {"false", BACKUP_BLOCK_COMPRESSION_OFF, true}, + {"no", BACKUP_BLOCK_COMPRESSION_OFF, true}, + {"0", BACKUP_BLOCK_COMPRESSION_OFF, true}, + {"pglz", BACKUP_BLOCK_COMPRESSION_PGLZ, true}, + {"snappy", BACKUP_BLOCK_COMPRESSION_SNAPPY, true}, + {"lz4", BACKUP_BLOCK_COMPRESSION_LZ4, true}, + {NULL, 0, false} +}; + /* * Options for enum values stored in other modules */ @@ -3498,6 +3512,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"compress_backup_block", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Compress backup block in WAL using specified compression algorithm."), + NULL + }, + &compress_backup_block, + BACKUP_BLOCK_COMPRESSION_OFF, backup_block_compression_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 1eaa5c1..ad4dbdb 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -177,6 +177,14 @@ typedef enum RECOVERY_TARGET_IMMEDIATE } RecoveryTargetType; +typedef enum +{ + BACKUP_BLOCK_COMPRESSION_OFF, + BACKUP_BLOCK_COMPRESSION_PGLZ, + BACKUP_BLOCK_COMPRESSION_SNAPPY, + BACKUP_BLOCK_COMPRESSION_LZ4 +} BackupBlockCompressionAlgorithm; + extern XLogRecPtr XactLastRecEnd; extern bool reachedConsistency; @@ -190,6 +198,7 @@ extern bool XLogArchiveMode; extern char *XLogArchiveCommand; extern bool EnableHotStandby; extern bool fullPageWrites; +extern int compress_backup_block; extern bool wal_log_hints; extern bool log_checkpoints; extern int num_xloginsert_locks; diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 3a692cd..d7dd747 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -48,7 +48,7 @@ typedef struct BkpBlock BlockNumber block; /* block number */ uint16 hole_offset; /* number of bytes before "hole" */ uint16 hole_length; /* number of bytes in "hole" */ - + uint8 block_compression; /*information about compression of block */ /* ACTUAL BLOCK DATA FOLLOWS AT END OF STRUCT */ } BkpBlock; -- 1.7.1