diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml index 0e6b636703..78921fca59 100644 --- a/doc/src/sgml/ref/pg_receivewal.sgml +++ b/doc/src/sgml/ref/pg_receivewal.sgml @@ -93,6 +93,26 @@ PostgreSQL documentation + + + + Command to execute once a WAL segment is completed. Any + %f in the string is replaced by the same of the WAL segment + that has just been completed. This is useful to perform extra actions + that need to be run once a segment has been completed without relying + on any external utilities, like copying the just-finished segment into + a secundary location, perform extra sanity checks on it or for example + perform cleanup actions on all the WAL segments already archived like + a cleanup of the oldest segments saved. Here is a simple example of + command: + +--end-segment-command="cp /mnt/server/archive/%f /mnt/server2/archive/" + + + + + + diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 4b75e765bb..07e708b011 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -489,6 +489,7 @@ LogStreamerMain(logstreamer_param *param) stream.partial_suffix = NULL; stream.replication_slot = replication_slot; stream.temp_slot = param->temp_slot; + stream.end_segment_cmd = NULL; if (stream.temp_slot && !stream.replication_slot) stream.replication_slot = psprintf("pg_basebackup_%d", (int) getpid()); diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 15348ada58..5640785823 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -42,6 +42,7 @@ static bool slot_exists_ok = false; static bool do_drop_slot = false; static bool synchronous = false; static char *replication_slot = NULL; +static char *end_segment_cmd = NULL; static void usage(void); @@ -83,6 +84,10 @@ usage(void) " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); printf(_(" -S, --slot=SLOTNAME replication slot to use\n")); printf(_(" --synchronous flush transaction log immediately after writing\n")); + printf(_(" --end-segment-command\n" + " custom command executed node a segment completes.\n" + " %%f can be used as placeholder to define the\n" + " name of the segment name.\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -Z, --compress=0-9 compress logs with given compression level\n")); @@ -418,6 +423,7 @@ StreamLog(void) stream.partial_suffix = ".partial"; stream.replication_slot = replication_slot; stream.temp_slot = false; + stream.end_segment_cmd = end_segment_cmd; ReceiveXlogStream(conn, &stream); @@ -473,6 +479,7 @@ main(int argc, char **argv) {"drop-slot", no_argument, NULL, 2}, {"if-not-exists", no_argument, NULL, 3}, {"synchronous", no_argument, NULL, 4}, + {"end-segment-command", required_argument, NULL, 5}, {NULL, 0, NULL, 0} }; @@ -570,6 +577,9 @@ main(int argc, char **argv) case 4: synchronous = true; break; + case 5: + end_segment_cmd = pg_strdup(optarg); + break; default: /* diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index f415135172..51776e5958 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -54,6 +54,7 @@ static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_ti static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); +static bool runEndSegmentCommand(StreamCtl *stream, XLogRecPtr blockpos); static bool mark_file_as_archived(StreamCtl *stream, const char *fname) @@ -749,6 +750,79 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) } /* + * Run command provided by user once a segment is completed. Returns true + * if the command succeeds, and false otherwise. + */ +static bool +runEndSegmentCommand(StreamCtl *stream, XLogRecPtr blockpos) +{ + char endSegmentCmd[MAXPGPATH]; + char xlogfname[MAXPGPATH]; + char *dp, *endp, *sp; + XLogSegNo segno; + int rc; + + Assert(stream->end_segment_cmd != NULL); + + /* + * Build the name of the segment just completed. This takes into + * account compressed segments. + */ + XLByteToPrevSeg(blockpos, segno); + XLogFileName(xlogfname, stream->timeline, segno); + if (stream->walmethod->get_compression() > 0) + { + snprintf(xlogfname, MAXPGPATH, "%s.gz", xlogfname); + } + + /* Construct the command to be executed */ + dp = endSegmentCmd; + endp = endSegmentCmd + MAXPGPATH - 1; + *endp = '\0'; + + /* + * Check presence of placeholders in the command provided and replace + * them accordingly + */ + for (sp = stream->end_segment_cmd; *sp; sp++) + { + if (*sp == '%') + { + switch (sp[1]) + { + case 'f': + /* %f: filename of just-completed segment file */ + sp++; + StrNCpy(dp, xlogfname, endp - dp); + dp += strlen(dp); + break; + default: + /* otherwise treat the % as not special */ + if (dp < endp) + *dp++ = *sp; + break; + } + } + else + { + if (dp < endp) + *dp++ = *sp; + } + } + *dp = '\0'; + + /* And now run the command */ + rc = system(endSegmentCmd); + if (rc != 0) + { + fprintf(stderr, _("%s: failed to run end-of-segment command \"%s\"\n"), + progname, endSegmentCmd); + return false; + } + return true; +} + +/* * The main loop of ReceiveXlogStream. Handles the COPY stream after * initiating streaming with the START_STREAMING command. * @@ -1174,6 +1248,10 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, still_sending = false; return true; /* ignore the rest of this XLogData packet */ } + + /* Run custom end-of-segment command */ + if (stream->end_segment_cmd != NULL) + runEndSegmentCommand(stream, *blockpos); } } /* No more data left to write, receive next copy packet */ diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 42e93ac745..f8610da079 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -46,6 +46,8 @@ typedef struct StreamCtl char *partial_suffix; /* Suffix appended to partially received files */ char *replication_slot; /* Replication slot to use, or NULL */ bool temp_slot; /* Create temporary replication slot */ + char *end_segment_cmd; /* Custom command run each time a segment + * is completed */ } StreamCtl; diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index d9ad596bf0..7c00a38dd5 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -310,6 +310,12 @@ dir_get_file_size(const char *pathname) return statbuf.st_size; } +static int +dir_get_compression(void) +{ + return dir_data->compression; +} + static bool dir_existsfile(const char *pathname) { @@ -351,6 +357,7 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) method->open_for_write = dir_open_for_write; method->write = dir_write; method->get_current_pos = dir_get_current_pos; + method->get_compression = dir_get_compression; method->get_file_size = dir_get_file_size; method->close = dir_close; method->sync = dir_sync; @@ -675,6 +682,12 @@ tar_get_file_size(const char *pathname) return -1; } +static int +tar_get_compression(void) +{ + return tar_data->compression; +} + static off_t tar_get_current_pos(Walfile f) { @@ -953,6 +966,7 @@ CreateWalTarMethod(const char *tarbase, int compression, bool sync) method->open_for_write = tar_open_for_write; method->write = tar_write; method->get_current_pos = tar_get_current_pos; + method->get_compression = tar_get_compression; method->get_file_size = tar_get_file_size; method->close = tar_close; method->sync = tar_sync; diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index 8d679dab61..2948ebdffa 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -26,6 +26,7 @@ struct WalWriteMethod int (*close) (Walfile f, WalCloseMethod method); bool (*existsfile) (const char *pathname); ssize_t (*get_file_size) (const char *pathname); + int (*get_compression) (void); ssize_t (*write) (Walfile f, const void *buf, size_t count); off_t (*get_current_pos) (Walfile f);