From d346974e1f6f2a505c7a6d083b89f47affa4ebb0 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 7 May 2020 12:22:17 -0400 Subject: [PATCH v2 05/11] Convert throttling-related code to a bbsink. --- src/backend/replication/Makefile | 1 + src/backend/replication/basebackup.c | 123 +--------- src/backend/replication/basebackup_throttle.c | 211 ++++++++++++++++++ src/include/replication/basebackup_sink.h | 1 + 4 files changed, 217 insertions(+), 119 deletions(-) create mode 100644 src/backend/replication/basebackup_throttle.c diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 6adc396501..58b6c228bb 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -19,6 +19,7 @@ OBJS = \ basebackup.o \ basebackup_libpq.o \ basebackup_sink.o \ + basebackup_throttle.o \ repl_gram.o \ slot.o \ slotfuncs.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index a56b0e9813..e0f469e3f2 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -75,7 +75,6 @@ static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf) static void perform_base_backup(basebackup_options *opt); static void parse_basebackup_options(List *options, basebackup_options *opt); static int compareWalFileNames(const ListCell *a, const ListCell *b); -static void throttle(size_t increment); static void update_basebackup_progress(int64 delta); static bool is_checksummed_file(const char *fullpath, const char *filename); static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset, @@ -92,23 +91,6 @@ static char *statrelpath = NULL; */ #define TAR_SEND_SIZE 32768 -/* - * How frequently to throttle, as a fraction of the specified rate-second. - */ -#define THROTTLING_FREQUENCY 8 - -/* The actual number of bytes, transfer of which may cause sleep. */ -static uint64 throttling_sample; - -/* Amount of data already transferred but not yet throttled. */ -static int64 throttling_counter; - -/* The minimum time required to transfer throttling_sample bytes. */ -static TimeOffset elapsed_min_unit; - -/* The last check of the transfer rate. */ -static TimestampTz throttled_last; - /* The starting XLOG position of the base backup. */ static XLogRecPtr startptr; @@ -262,6 +244,10 @@ perform_base_backup(basebackup_options *opt) List *tablespaces = NIL; bbsink *sink = bbsink_libpq_new(); + /* Set up network throttling, if client requested it */ + if (opt->maxrate > 0) + sink = bbsink_throttle_new(sink, opt->maxrate); + backup_total = 0; backup_streamed = 0; pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid); @@ -370,30 +356,6 @@ perform_base_backup(basebackup_options *opt) /* notify basebackup sink about start of backup */ bbsink_begin_backup(sink, startptr, starttli, tablespaces); - /* Setup and activate network throttling, if client requested it */ - if (opt->maxrate > 0) - { - throttling_sample = - (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY; - - /* - * The minimum amount of time for throttling_sample bytes to be - * transferred. - */ - elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; - - /* Enable throttling. */ - throttling_counter = 0; - - /* The 'real data' starts now (header was ignored). */ - throttled_last = GetCurrentTimestamp(); - } - else - { - /* Disable throttling. */ - throttling_counter = -1; - } - /* Send off our tablespaces one by one */ foreach(lc, tablespaces) { @@ -638,7 +600,6 @@ perform_base_backup(basebackup_options *opt) update_basebackup_progress(cnt); len += cnt; - throttle(cnt); if (len == wal_segment_size) break; @@ -1614,7 +1575,6 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt); len += cnt; - throttle(cnt); } /* If the file was truncated while we were sending it, pad it with zeros */ @@ -1628,7 +1588,6 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt); update_basebackup_progress(cnt); len += cnt; - throttle(cnt); } } @@ -1722,80 +1681,6 @@ convert_link_to_directory(const char *pathbuf, struct stat *statbuf) statbuf->st_mode = S_IFDIR | pg_dir_create_mode; } -/* - * Increment the network transfer counter by the given number of bytes, - * and sleep if necessary to comply with the requested network transfer - * rate. - */ -static void -throttle(size_t increment) -{ - TimeOffset elapsed_min; - - if (throttling_counter < 0) - return; - - throttling_counter += increment; - if (throttling_counter < throttling_sample) - return; - - /* How much time should have elapsed at minimum? */ - elapsed_min = elapsed_min_unit * - (throttling_counter / throttling_sample); - - /* - * Since the latch could be set repeatedly because of concurrently WAL - * activity, sleep in a loop to ensure enough time has passed. - */ - for (;;) - { - TimeOffset elapsed, - sleep; - int wait_result; - - /* Time elapsed since the last measurement (and possible wake up). */ - elapsed = GetCurrentTimestamp() - throttled_last; - - /* sleep if the transfer is faster than it should be */ - sleep = elapsed_min - elapsed; - if (sleep <= 0) - break; - - ResetLatch(MyLatch); - - /* We're eating a potentially set latch, so check for interrupts */ - CHECK_FOR_INTERRUPTS(); - - /* - * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be - * the maximum time to sleep. Thus the cast to long is safe. - */ - wait_result = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - (long) (sleep / 1000), - WAIT_EVENT_BASE_BACKUP_THROTTLE); - - if (wait_result & WL_LATCH_SET) - CHECK_FOR_INTERRUPTS(); - - /* Done waiting? */ - if (wait_result & WL_TIMEOUT) - break; - } - - /* - * As we work with integers, only whole multiple of throttling_sample was - * processed. The rest will be done during the next call of this function. - */ - throttling_counter %= throttling_sample; - - /* - * Time interval for the remaining amount and possible next increments - * starts now. - */ - throttled_last = GetCurrentTimestamp(); -} - /* * Increment the counter for the amount of data already streamed * by the given number of bytes, and update the progress report for diff --git a/src/backend/replication/basebackup_throttle.c b/src/backend/replication/basebackup_throttle.c new file mode 100644 index 0000000000..0e3b4542bd --- /dev/null +++ b/src/backend/replication/basebackup_throttle.c @@ -0,0 +1,211 @@ +/*------------------------------------------------------------------------- + * + * basebackup_throttle.c + * Basebackup sink implementing throttling. Data is forwarded to the + * next base backup sink in the chain at a rate no greater than the + * configured maximum. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_throttle.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "replication/basebackup_sink.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "utils/timestamp.h" + +typedef struct bbsink_throttle +{ + /* Common information for all types of sink. */ + bbsink base; + + /* The actual number of bytes, transfer of which may cause sleep. */ + uint64 throttling_sample; + + /* Amount of data already transferred but not yet throttled. */ + int64 throttling_counter; + + /* The minimum time required to transfer throttling_sample bytes. */ + TimeOffset elapsed_min_unit; + + /* The last check of the transfer rate. */ + TimestampTz throttled_last; +} bbsink_throttle; + +static void bbsink_throttle_begin_backup(bbsink *sink, + XLogRecPtr startptr, + TimeLineID starttli, + List *tablespaces); +static void bbsink_throttle_archive_contents(bbsink *sink, + const char *data, size_t len); +static void bbsink_throttle_manifest_contents(bbsink *sink, + const char *data, size_t len); +static void throttle(bbsink_throttle *sink, size_t increment); + +const bbsink_ops bbsink_throttle_ops = { + .begin_backup = bbsink_throttle_begin_backup, + .begin_archive = bbsink_forward_begin_archive, + .archive_contents = bbsink_throttle_archive_contents, + .end_archive = bbsink_forward_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_throttle_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup +}; + +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* + * Create a new basebackup sink that performs throttling and forwards data + * to a successor sink. + */ +bbsink * +bbsink_throttle_new(bbsink *next, uint32 maxrate) +{ + bbsink_throttle *sink; + + Assert(next != NULL); + Assert(maxrate > 0); + + sink = palloc0(sizeof(bbsink_throttle)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops; + sink->base.bbs_next = next; + + sink->throttling_sample = + (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample bytes to be + * transferred. + */ + sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + return &sink->base; +} + +/* + * There's no real work to do here, but we need to record the current time so + * that it can be used for future calculations. + */ +static void +bbsink_throttle_begin_backup(bbsink *sink, XLogRecPtr startptr, + TimeLineID starttli, List *tablespaces) +{ + bbsink_throttle *mysink = (bbsink_throttle *) sink; + + Assert(sink->bbs_next != NULL); + bbsink_begin_backup(sink->bbs_next, startptr, starttli, tablespaces); + + /* The 'real data' starts now (header was ignored). */ + mysink->throttled_last = GetCurrentTimestamp(); +} + +/* + * First throttle, and then pass archive contents to next sink. + */ +static void +bbsink_throttle_archive_contents(bbsink *sink, const char *data, size_t len) +{ + bbsink_throttle *mysink = (bbsink_throttle *) sink; + + throttle(mysink, len); + + Assert(sink->bbs_next != NULL); + bbsink_archive_contents(sink->bbs_next, data, len); +} + +/* + * First throttle, and then pass manifest contents to next sink. + */ +static void +bbsink_throttle_manifest_contents(bbsink *sink, const char *data, size_t len) +{ + bbsink_throttle *mysink = (bbsink_throttle *) sink; + + throttle(mysink, len); + + Assert(sink->bbs_next != NULL); + bbsink_manifest_contents(sink->bbs_next, data, len); +} + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(bbsink_throttle *sink, size_t increment) +{ + TimeOffset elapsed_min; + + Assert(sink->throttling_counter >= 0); + + sink->throttling_counter += increment; + if (sink->throttling_counter < sink->throttling_sample) + return; + + /* How much time should have elapsed at minimum? */ + elapsed_min = sink->elapsed_min_unit * + (sink->throttling_counter / sink->throttling_sample); + + /* + * Since the latch could be set repeatedly because of concurrently WAL + * activity, sleep in a loop to ensure enough time has passed. + */ + for (;;) + { + TimeOffset elapsed, + sleep; + int wait_result; + + /* Time elapsed since the last measurement (and possible wake up). */ + elapsed = GetCurrentTimestamp() - sink->throttled_last; + + /* sleep if the transfer is faster than it should be */ + sleep = elapsed_min - elapsed; + if (sleep <= 0) + break; + + ResetLatch(MyLatch); + + /* We're eating a potentially set latch, so check for interrupts */ + CHECK_FOR_INTERRUPTS(); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + (long) (sleep / 1000), + WAIT_EVENT_BASE_BACKUP_THROTTLE); + + if (wait_result & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + + /* Done waiting? */ + if (wait_result & WL_TIMEOUT) + break; + } + + /* + * As we work with integers, only whole multiple of throttling_sample was + * processed. The rest will be done during the next call of this function. + */ + sink->throttling_counter %= sink->throttling_sample; + + /* + * Time interval for the remaining amount and possible next increments + * starts now. + */ + sink->throttled_last = GetCurrentTimestamp(); +} diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index a8df937957..bc1710e2eb 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -175,5 +175,6 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, /* Constructors for various types of sinks. */ extern bbsink *bbsink_libpq_new(void); +extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); #endif -- 2.24.3 (Apple Git-128)