From 291072c1fce4717cef3e20b15fdcbeaf143d5e78 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 1 May 2020 14:11:33 -0400 Subject: [PATCH v2 03/11] Introduce bbsink abstraction. --- src/backend/replication/Makefile | 1 + src/backend/replication/basebackup_sink.c | 72 +++++++++ src/include/replication/basebackup_sink.h | 176 ++++++++++++++++++++++ 3 files changed, 249 insertions(+) create mode 100644 src/backend/replication/basebackup_sink.c create mode 100644 src/include/replication/basebackup_sink.h diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index a0381e52f3..25d56478f4 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -17,6 +17,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ backup_manifest.o \ basebackup.o \ + basebackup_sink.o \ repl_gram.o \ slot.o \ slotfuncs.o \ diff --git a/src/backend/replication/basebackup_sink.c b/src/backend/replication/basebackup_sink.c new file mode 100644 index 0000000000..bd0298990d --- /dev/null +++ b/src/backend/replication/basebackup_sink.c @@ -0,0 +1,72 @@ +/*------------------------------------------------------------------------- + * + * basebackup_sink.c + * Default implementations for bbsink (basebackup sink) callbacks. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * src/backend/replication/basebackup_sink.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "replication/basebackup_sink.h" + +void +bbsink_forward_begin_backup(bbsink *sink, XLogRecPtr startptr, + TimeLineID starttli, List *tablespaces) +{ + Assert(sink->bbs_next != NULL); + bbsink_begin_backup(sink->bbs_next, startptr, starttli, tablespaces); +} + +void +bbsink_forward_begin_archive(bbsink *sink, const char *archive_name) +{ + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, archive_name); +} + +void +bbsink_forward_archive_contents(bbsink *sink, const char *data, size_t len) +{ + Assert(sink->bbs_next != NULL); + bbsink_archive_contents(sink->bbs_next, data, len); +} + +void +bbsink_forward_end_archive(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + bbsink_end_archive(sink->bbs_next); +} + +void +bbsink_forward_begin_manifest(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + bbsink_begin_manifest(sink->bbs_next); +} + +void +bbsink_forward_manifest_contents(bbsink *sink, const char *data, size_t len) +{ + Assert(sink->bbs_next != NULL); + bbsink_manifest_contents(sink->bbs_next, data, len); +} + +void +bbsink_forward_end_manifest(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + bbsink_end_manifest(sink->bbs_next); +} + +void +bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli) +{ + Assert(sink->bbs_next != NULL); + bbsink_end_backup(sink->bbs_next, endptr, endtli); +} diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h new file mode 100644 index 0000000000..050cf1180d --- /dev/null +++ b/src/include/replication/basebackup_sink.h @@ -0,0 +1,176 @@ +/*------------------------------------------------------------------------- + * + * basebackup_sink.h + * API for filtering or sending to a final destination the archives + * produced by the base backup process + * + * From a logical point of view, a basebackup sink's callbacks are invoked + * after the source files read from the data directory have been assembled + * into archives (e.g. by creating one tar file per tablespace) but before + * those archives are sent to the client. In reality, processing is + * interleaved, with archives being generated incrementally and these + * callbacks being invoked on the archive fragment as they are generated. + * The point, however, is that a basebackup sink shouldn't be trying to + * do anything with individual data files, nor should it do anything that + * depends on a particular choice of archive format. It should only + * perform processing that treats the archives passed to it -- and the + * backup manifest -- as opaque blobs of bytes. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * src/include/replication/basebackup_sink.h + * + *------------------------------------------------------------------------- + */ +#ifndef BASEBACKUP_SINK_H +#define BASEBACKUP_SINK_H + +#include "access/xlog_internal.h" +#include "nodes/pg_list.h" + +/* Forward declarations. */ +struct bbsink; +struct bbsink_ops; +typedef struct bbsink bbsink; +typedef struct bbsink_ops bbsink_ops; + +/* + * Common data for any type of basebackup sink. + * + * 'bbs_ops' is the relevant callback table. + * + * 'bbs_next' is a pointer to another bbarchiver to which this bbarchiver is + * forwarding some or all operations. + * + * If a bbsink needs to store additional state, it can allocate a larger + * structure whose first element is a bbsink. + */ +struct bbsink +{ + const bbsink_ops *bbs_ops; + bbsink *bbs_next; +}; + +/* + * Callbacks for a base backup sink. + * + * All of these callbacks are required. If a particular callback just needs to + * forward the call to sink->bbs_next, use bbsink_forward_ as + * the callback. + * + * Callers should always invoke these callbacks via the bbsink_* inline functions + * rather than calling them directly. + */ +struct bbsink_ops +{ + /* This callback is invoked just once, at the very start of the backup. */ + void (*begin_backup)(bbsink *sink, XLogRecPtr startptr, + TimeLineID starttli, List *tablespaces); + + /* + * For each archive produced by the backup process, there will be one call + * to the begin_archive() callback, some number of calls to the + * archive_contents() callback, and then one call to the end_archive() + * callback. + */ + void (*begin_archive)(bbsink *sink, const char *archive_name); + void (*archive_contents)(bbsink *sink, const char *data, size_t len); + void (*end_archive)(bbsink *sink); + + /* + * After all archives have been sent, and provided that the caller has + * requested a backup manifest, there will be one call to the + * begin_manifest() callback, some number of calls to the + * manifest_contents() callback, and then one call to the end_manifest() + * callback. + */ + void (*begin_manifest)(bbsink *sink); + void (*manifest_contents)(bbsink *sink, const char *data, size_t len); + void (*end_manifest)(bbsink *sink); + + /* This callback is invoked just once, at the very end of the backup. */ + void (*end_backup)(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli); +}; + +/* Begin a backup. */ +static inline void +bbsink_begin_backup(bbsink *sink, XLogRecPtr startptr, TimeLineID starttli, + List *tablespaces) +{ + Assert(sink != NULL); + sink->bbs_ops->begin_backup(sink, startptr, starttli, tablespaces); +} + +/* Begin an archive. */ +static inline void +bbsink_begin_archive(bbsink *sink, const char *archive_name) +{ + Assert(sink != NULL); + sink->bbs_ops->begin_archive(sink, archive_name); +} + +/* Process some of the contents of an archive. */ +static inline void +bbsink_archive_contents(bbsink *sink, const char *data, size_t len) +{ + Assert(sink != NULL); + sink->bbs_ops->archive_contents(sink, data, len); +} + +/* Finish an archive. */ +static inline void +bbsink_end_archive(bbsink *sink) +{ + Assert(sink != NULL); + sink->bbs_ops->end_archive(sink); +} + +/* Begin the backup manifest. */ +static inline void +bbsink_begin_manifest(bbsink *sink) +{ + Assert(sink != NULL); + sink->bbs_ops->begin_manifest(sink); +} + +/* Process some of the manifest contents. */ +static inline void +bbsink_manifest_contents(bbsink *sink, const char *data, size_t len) +{ + Assert(sink != NULL); + sink->bbs_ops->manifest_contents(sink, data, len); +} + +/* Finish the backup manifest. */ +static inline void +bbsink_end_manifest(bbsink *sink) +{ + Assert(sink != NULL); + sink->bbs_ops->end_manifest(sink); +} + +/* Finish a backup. */ +static inline void +bbsink_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli) +{ + Assert(sink != NULL); + sink->bbs_ops->end_backup(sink, endptr, endtli); +} + +/* Forwarding callbacks. Use these to pass operations through to next sink. */ +extern void bbsink_forward_begin_backup(bbsink *sink, XLogRecPtr startptr, + TimeLineID starttli, + List *tablespaces); +extern void bbsink_forward_begin_archive(bbsink *sink, + const char *archive_name); +extern void bbsink_forward_archive_contents(bbsink *sink, const char *data, + size_t len); +extern void bbsink_forward_end_archive(bbsink *sink); +extern void bbsink_forward_begin_manifest(bbsink *sink); +extern void bbsink_forward_manifest_contents(bbsink *sink, const char *data, + size_t len); +extern void bbsink_forward_end_manifest(bbsink *sink); +extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); + +#endif -- 2.24.3 (Apple Git-128)