From 5bf9908cf10918a2ae43ceadd44d6c9089736cff Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Fri, 6 Mar 2026 10:31:35 +0100 Subject: [PATCH v12] Adding new init modes to pgbench including COPY FROM BINARY as well as populating data in multiple transactions --- src/bin/pgbench/pgbench.c | 802 ++++++++++++++++--- src/bin/pgbench/t/001_pgbench_with_server.pl | 236 +++++- 2 files changed, 906 insertions(+), 132 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1dae918cc09..48f2950e245 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -159,18 +159,36 @@ typedef struct socket_set /******************************************************************** * some configurable parameters */ - #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ -#define ALL_INIT_STEPS "dtgGvpf" /* all possible steps */ +#define ALL_INIT_STEPS "dtMScgGUvpf" /* all possible steps */ #define LOG_STEP_SECONDS 5 /* seconds between log messages */ -#define DEFAULT_NXACTS 10 /* default nxacts */ +#define DEFAULT_NXACTS 10 /* default nxacts */ #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */ #define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */ #define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */ +/* server-side methods to generate data */ +#define INIT_STEP_GEN_TYPE_INSERT_SERIES 'G' /* use INSERT .. SELECT + * generate_series to generate + * data */ +#define INIT_STEP_GEN_TYPE_INSERT_UNNEST 'U' /* use INSERT .. SELECT unnest + * to generate data */ +/* client-side methods to generate data */ +#define INIT_STEP_GEN_TYPE_COPY_TEXT 'g' /* use COPY .. FROM STDIN .. + * TEXT to generate data */ +#define INIT_STEP_GEN_TYPE_COPY_BINARY 'c' /* use COPY .. FROM STDIN .. + * BINARY to generate data */ +/* data init pseudo steps */ +#define INIT_STEP_GEN_TYPE_SINGLE_XACT 'S' /* switch to init data as + * single transaction */ +#define INIT_STEP_GEN_TYPE_MULTI_XACT 'M' /* switch to init data as + * multiple transactions */ + +static bool multi_xact = false; /* init data type (as single or multiple + * transactions) */ static int nxacts = 0; /* number of transactions per client */ static int duration = 0; /* duration in seconds */ static int64 end_time = 0; /* when to stop in micro seconds, under -T */ @@ -181,6 +199,19 @@ static int64 end_time = 0; /* when to stop in micro seconds, under -T */ */ static int scale = 1; +/* + * mode of data generation to use + */ +static char data_generation_type = INIT_STEP_GEN_TYPE_COPY_TEXT; + +/* + * COPY FROM BINARY execution buffer + */ +#define BIN_COPY_BUF_SIZE 102400 /* maximum buffer size for COPY FROM + * BINARY */ +static char *bin_copy_buffer = NULL; /* buffer for COPY FROM BINARY */ +static int32_t bin_copy_buffer_length = 0; /* current buffer size */ + /* * fillfactor. for example, fillfactor = 90 will use only 90 percent * space during inserts and leave 10 percent free. @@ -456,6 +487,9 @@ typedef struct StatsData */ static pg_time_usec_t epoch_shift; +/* used to track elapsed time and estimate of the remaining time of data load */ +static pg_time_usec_t data_load_start; + /* * Error status for errors during script execution. */ @@ -851,6 +885,7 @@ static bool socket_has_input(socket_set *sa, int fd, int idx); /* callback used to build rows for COPY during data loading */ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); +typedef void (*initRowMethodBinary) (PGconn *con, int64_t curr, int32_t parent); /* callback functions for our flex lexer */ static const PsqlScanCallbacks pgbench_callbacks = { @@ -913,8 +948,14 @@ usage(void) " run selected initialization steps, in the specified order\n" " d: drop any existing pgbench tables\n" " t: create the tables used by the standard pgbench scenario\n" - " g: generate data, client-side\n" - " G: generate data, server-side\n" + " to generate data, client-side:\n" + " g: COPY .. FROM STDIN .. TEXT\n" + " c: COPY .. FROM STDIN .. BINARY\n" + " to generate data, server-side:\n" + " G: INSERT .. SELECT generate_series\n" + " U: INSERT .. SELECT unnest\n" + " S: flag to use single transaction to initialize data\n" + " M: flag to use multiple transactions to initialize data\n" " v: invoke VACUUM on the standard tables\n" " p: create primary key indexes on the standard tables\n" " f: create foreign keys between the standard tables\n" @@ -4916,8 +4957,8 @@ initCreateTables(PGconn *con) static const struct ddlinfo DDLs[] = { { "pgbench_history", - "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)", - "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)", + "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22) default ''", + "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22) default ''", 0 }, { @@ -4928,8 +4969,8 @@ initCreateTables(PGconn *con) }, { "pgbench_accounts", - "aid int not null,bid int,abalance int,filler char(84)", - "aid bigint not null,bid int,abalance int,filler char(84)", + "aid int not null,bid int,abalance int,filler char(84) default ''", + "aid bigint not null,bid int,abalance int,filler char(84) default ''", 1 }, { @@ -5025,31 +5066,89 @@ initAccount(PQExpBufferData *sql, int64 curr) } static void -initPopulateTable(PGconn *con, const char *table, int64 base, - initRowMethod init_row) +showPopulateTableCopyProgress(const char *table, int64 current, int64 total) +{ + static int chars = 0; + static int prev_chars = 0; + static int log_interval = 1; + + /* Stay on the same line if reporting to a terminal */ + char eol = isatty(fileno(stderr)) ? '\r' : '\n'; + + double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - data_load_start); + double remaining_sec = ((double) total - current) * elapsed_sec / current; + + /* + * If we want to stick with the original logging, print a message each + * 100k inserted rows. + */ + if ((!use_quiet) && (current % 100000 == 0)) + { + chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", + current, total, + (int) ((current * 100) / total), + table, elapsed_sec, remaining_sec); + + /* + * If the previous progress message is longer than the current one, + * add spaces to the current line to fully overwrite any remaining + * characters from the previous message. + */ + if (prev_chars > chars) + fprintf(stderr, "%*c", prev_chars - chars, ' '); + fputc(eol, stderr); + prev_chars = chars; + } + /* let's not call the timing for each row, but only each 100 rows */ + else if (use_quiet && (current % 100 == 0)) + { + /* have we reached the next interval (or end)? */ + if ((current == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) + { + chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", + current, total, + (int) ((current * 100) / total), + table, elapsed_sec, remaining_sec); + + /* + * If the previous progress message is longer than the current + * one, add spaces to the current line to fully overwrite any + * remaining characters from the previous message. + */ + if (prev_chars > chars) + fprintf(stderr, "%*c", prev_chars - chars, ' '); + fputc(eol, stderr); + prev_chars = chars; + + /* skip to the next interval */ + log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); + } + } + + if (current + 1 == total && chars != 0) + { + fprintf(stderr, "%*c", chars, ' '); /* Clear the current line */ + fputc(eol, stderr); + } +} + +static void +initPopulateTableCopyText(PGconn *con, const char *table, int counter, int64 base, + initRowMethod init_row) { int n; - int64 k; - int chars = 0; - int prev_chars = 0; PGresult *res; PQExpBufferData sql; char copy_statement[256]; const char *copy_statement_fmt = "copy %s from stdin"; - int64 total = base * scale; - - /* used to track elapsed time and estimate of the remaining time */ - pg_time_usec_t start; - int log_interval = 1; - - /* Stay on the same line if reporting to a terminal */ - char eol = isatty(fileno(stderr)) ? '\r' : '\n'; + int64 start = base * counter; initPQExpBuffer(&sql); /* Use COPY with FREEZE on v14 and later for all ordinary tables */ if ((PQserverVersion(con) >= 140000) && - get_table_relkind(con, table) == RELKIND_RELATION) + get_table_relkind(con, table) == RELKIND_RELATION && + !multi_xact) copy_statement_fmt = "copy %s from stdin with (freeze on)"; @@ -5065,75 +5164,18 @@ initPopulateTable(PGconn *con, const char *table, int64 base, pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); PQclear(res); - start = pg_time_now(); - - for (k = 0; k < total; k++) + for (int64_t i = start; i < start + base; i++) { - int64 j = k + 1; - - init_row(&sql, k); + init_row(&sql, i); if (PQputline(con, sql.data)) pg_fatal("PQputline failed"); if (CancelRequested) break; - /* - * If we want to stick with the original logging, print a message each - * 100k inserted rows. - */ - if ((!use_quiet) && (j % 100000 == 0)) - { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; - - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); - - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; - } - /* let's not call the timing for each row, but only each 100 rows */ - else if (use_quiet && (j % 100 == 0)) - { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; - - /* have we reached the next interval (or end)? */ - if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) - { - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); - - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; - - /* skip to the next interval */ - log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); - } - } + showPopulateTableCopyProgress(table, i, base * scale); } - if (chars != 0 && eol != '\n') - fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */ if (PQputline(con, "\\.\n")) pg_fatal("very last PQputline failed"); @@ -5150,9 +5192,9 @@ initPopulateTable(PGconn *con, const char *table, int64 base, * a blank-padded string in pgbench_accounts. */ static void -initGenerateDataClientSide(PGconn *con) +initGenerateDataClientSideTextFrmt(PGconn *con) { - fprintf(stderr, "generating data (client-side)...\n"); + fprintf(stderr, "TEXT mode...\n"); /* * we do all of this in one transaction to enable the backend's @@ -5163,64 +5205,549 @@ initGenerateDataClientSide(PGconn *con) /* truncate away any old data */ initTruncateTables(con); - /* - * fill branches, tellers, accounts in that order in case foreign keys - * already exist + if (multi_xact) + executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + if (multi_xact) + executeStatement(con, "begin"); + + /* + * fill branches, tellers, accounts in that order in case foreign keys + * already exist + */ + initPopulateTableCopyText(con, "pgbench_branches", i, nbranches, initBranch); + initPopulateTableCopyText(con, "pgbench_tellers", i, ntellers, initTeller); + initPopulateTableCopyText(con, "pgbench_accounts", i, naccounts, initAccount); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); +} + + +/* + * Save char data to buffer + * Kept as separate proc for possible addition of something + * like addCharColumn in future + */ +static void +bufferCharData(char *src, int32_t len) +{ + Assert(bin_copy_buffer_length + len <= BIN_COPY_BUF_SIZE); + + memcpy((char *) bin_copy_buffer + bin_copy_buffer_length, (char *) src, len); + bin_copy_buffer_length += len; +} + +/* + * Converts platform byte order into network byte order + * SPARC doesn't reqire that + */ +static void +bufferData(void *src, int32_t len) +{ + Assert(bin_copy_buffer_length + len <= BIN_COPY_BUF_SIZE); + +#ifdef __sparc__ + bufferCharData(src, len); +#else + + if (len == 1) + bufferCharData(src, len); + else + { + for (int32_t i = 0; i < len; i++) + { + ((char *) bin_copy_buffer + bin_copy_buffer_length)[i] = + ((char *) src)[len - i - 1]; + } + + bin_copy_buffer_length += len; + } +#endif +} + +/* + * adds column counter + */ +static void +addColumnCounter(int16_t n) +{ + bufferData((void *) &n, sizeof(n)); +} + +/* + * adds column with inti32 value + */ +static void +addInt32Column(int32_t value) +{ + int32_t data = value; + int32_t size = sizeof(data); + + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * adds column with inti64 value + */ +static void +addInt64Column(int64_t value) +{ + int64_t data = value; + int32_t size = sizeof(data); + + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * Starts communication with server for COPY FROM BINARY statement + */ +static void +sendBinaryCopyHeader(PGconn *con) +{ + static char header[] = {'P', 'G', 'C', 'O', 'P', 'Y', '\n', '\377', '\r', '\n', '\0', + '\0', '\0', '\0', '\0', + '\0', '\0', '\0', '\0'}; + + PQputCopyData(con, header, 19); +} + +/* + * Finishes communication with server for COPY FROM BINARY statement + */ +static void +sendBinaryCopyTrailer(PGconn *con) +{ + static char trailer[] = {0xFF, 0xFF}; + + PQputCopyData(con, trailer, 2); +} + +/* + * Flashes current buffer over network if needed + */ +static void +flushBuffer(PGconn *con, int16_t row_len) +{ + PGresult *res; + + if (bin_copy_buffer_length + row_len > BIN_COPY_BUF_SIZE) + { + res = PQgetResult(con); + + Assert(bin_copy_buffer_length <= BIN_COPY_BUF_SIZE); + + /* flush current buffer */ + if (PQresultStatus(res) == PGRES_COPY_IN) + PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length); + else + pg_fatal("It is NOT a COPY command that is currently running"); + + PQclear(res); + bin_copy_buffer_length = 0; + } +} + +/* + * Sends current branch row to buffer + */ +static void +initBranchBinary(PGconn *con, int64_t curr, int32_t parent) +{ + /*--- + * Check documentation about COPY command: + * https://www.postgresql.org/docs/current/sql-copy.html + * + * Each row of branches table is sent as: + * - 2 bytes for number of columns in tuple or sizeof(int16_t) + * - then 4 bytes or sizeof(int32_t) in front of each field with length of the field + * + * - branches table has following columns: + * - 4 bytes for bid column or sizeof(int32_t) + * - 4 bytes for bbalance column or sizeof(int32_t) + * - 88 bytes for filler column (optional since no requirement for row length) + *--- */ - initPopulateTable(con, "pgbench_branches", nbranches, initBranch); - initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + /* following is our max intent at the moment */ + int16_t max_row_len = 2 + (4 + 4) + (4 + 4) + (4 + 88); - executeStatement(con, "commit"); + flushBuffer(con, max_row_len); + + addColumnCounter(2); + + addInt32Column(curr + 1); + addInt32Column(0); + /* we don't send filler column here to minimize network traffic and WAL */ } /* - * Fill the standard tables with some data generated on the server - * - * As already the case with the client-side data generation, the filler - * column defaults to NULL in pgbench_branches and pgbench_tellers, - * and is a blank-padded string in pgbench_accounts. + * Sends current teller row to buffer */ static void -initGenerateDataServerSide(PGconn *con) +initTellerBinary(PGconn *con, int64_t curr, int32_t parent) { - PQExpBufferData sql; + /*--- + * Check documentation about COPY command: + * https://www.postgresql.org/docs/current/sql-copy.html + * + * Each row of tellers table is sent as: + * - 2 bytes for number of columns in tuple or sizeof(int16_t) + * - then 4 bytes or sizeof(int32_t) in front of each field with length of the field + * + * - tellers table has following columns: + * - 4 bytes for tid column or sizeof(int32_t) + * - 4 bytes for bid column or sizeof(int32_t) + * - 4 bytes for tbalance column or sizeof(int32_t) + * - 84 bytes for filler column (optional since no requirement for row length) + *--- + */ + /* following is our max intent at the moment */ + int16_t max_row_len = 2 + (4 + 4) + (4 + 4) + (4 + 4) + (4 + 84); + + flushBuffer(con, max_row_len); + + addColumnCounter(3); + + addInt32Column(curr + 1); + addInt32Column(curr / parent + 1); + addInt32Column(0); + /* we don't send filler column here to minimize network traffic and WAL */ +} + +/* + * Sends current account row to buffer + */ +static void +initAccountBinary(PGconn *con, int64_t curr, int32_t parent) +{ + /*--- + * Check documentation about COPY command: + * https://www.postgresql.org/docs/current/sql-copy.html + * + * Each row of accounts table is sent as: + * - 2 bytes for number of columns in tuple or sizeof(int16_t) + * - then 4 bytes or sizeof(int32_t) in front of each field with length of the field + * + * - accounts table has following columns (taking into account scale > 20000): + * - 8 bytes for aid column or sizeof(int64_t) + * - 4 bytes for bid column or sizeof(int32_t) + * - 4 bytes for abalance column or sizeof(int32_t) + * - 84 bytes for filler column (optional since no requirement for row length) + *--- + */ + /* following is our max intent at the moment */ + int16_t max_row_len = 2 + (4 + 8) + (4 + 4) + (4 + 4) + (4 + 84); + + flushBuffer(con, max_row_len); + + addColumnCounter(3); + + if (scale <= SCALE_32BIT_THRESHOLD) + addInt32Column(curr + 1); + else + addInt64Column(curr); + + addInt32Column(curr / parent + 1); + addInt32Column(0); + /* we don't send filler column here to minimize network traffic and WAL */ +} + +/* + * Universal wrapper for sending data in binary format + */ +static void +initPopulateTableCopyBinary(PGconn *con, char *table, char *columns, + int counter, int64_t base, initRowMethodBinary init_row) +{ + int n; + PGresult *res; + char copy_statement[256]; + const char *copy_statement_fmt = "copy %s (%s) from stdin (format binary)"; + int64_t start = base * counter; + + bin_copy_buffer_length = 0; + + /* Use COPY with FREEZE on v14 and later for all ordinary tables */ + if ((PQserverVersion(con) >= 140000) && + get_table_relkind(con, table) == RELKIND_RELATION && + !multi_xact) + copy_statement_fmt = "copy %s (%s) from stdin with (format binary, freeze on)"; + + n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table, columns); + if (n >= sizeof(copy_statement)) + pg_fatal("invalid buffer size: must be at least %d characters long", n); + else if (n == -1) + pg_fatal("invalid format string"); + + res = PQexec(con, copy_statement); + + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); + PQclear(res); + + + sendBinaryCopyHeader(con); + + + for (int64_t i = start; i < start + base; i++) + { + init_row(con, i, base); + + if (CancelRequested) + break; + + showPopulateTableCopyProgress(table, i, base * scale); + } + + res = PQgetResult(con); + + Assert(bin_copy_buffer_length <= BIN_COPY_BUF_SIZE); + + if (PQresultStatus(res) == PGRES_COPY_IN) + PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length); + else + fprintf(stderr, "Unexpected mode %d instead of %d\n", PQresultStatus(res), PGRES_COPY_IN); + PQclear(res); + + + sendBinaryCopyTrailer(con); + + + res = PQgetResult(con); + if (PQresultStatus(res) == PGRES_COPY_IN) + { + if (PQputCopyEnd(con, NULL) == 1) /* success */ + { + PQclear(res); + res = PQgetResult(con); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + fprintf(stderr, "Error: %s\n", PQerrorMessage(con)); + } + else + fprintf(stderr, "Error: %s\n", PQerrorMessage(con)); + } + PQclear(res); +} + +/* + * Wrapper for binary data load + */ +static void +initGenerateDataClientSideBinaryFrmt(PGconn *con) +{ + + fprintf(stderr, "BINARY mode...\n"); - fprintf(stderr, "generating data (server-side)...\n"); + bin_copy_buffer = pg_malloc(BIN_COPY_BUF_SIZE); + bin_copy_buffer_length = 0; /* - * we do all of this in one transaction to enable the backend's - * data-loading optimizations + * we do all of this in multiple transactions to minimize load on DB + * server and perhaps in future allow load in parallel sessions */ executeStatement(con, "begin"); /* truncate away any old data */ initTruncateTables(con); + if (multi_xact) + executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + if (multi_xact) + executeStatement(con, "begin"); + + initPopulateTableCopyBinary(con, "pgbench_branches", "bid, bbalance", + i, nbranches, initBranchBinary); + initPopulateTableCopyBinary(con, "pgbench_tellers", "tid, bid, tbalance", + i, ntellers, initTellerBinary); + initPopulateTableCopyBinary(con, "pgbench_accounts", "aid, bid, abalance", + i, naccounts, initAccountBinary); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); + + pg_free(bin_copy_buffer); +} + +/* + * Fill the standard tables with some data generated and sent from the client. + */ +static void +initGenerateDataClientSide(PGconn *con) +{ + fprintf(stderr, "generating data (client-side as %s transaction%s) in ", + multi_xact ? "multiple" : "single", multi_xact ? "s" : ""); + + data_load_start = pg_time_now(); + + switch (data_generation_type) + { + case INIT_STEP_GEN_TYPE_COPY_TEXT: + initGenerateDataClientSideTextFrmt(con); + break; + case INIT_STEP_GEN_TYPE_COPY_BINARY: + initGenerateDataClientSideBinaryFrmt(con); + break; + } +} + +/* + * Generating data via INSERT .. SELECT .. FROM generate_series + * Possibly as "One transaction per scale" in multi-transaction mode + */ +static void +generateDataInsertSeries(PGconn *con) +{ + PQExpBufferData sql; + + fprintf(stderr, "via INSERT .. SELECT generate_series... in multiple TXN(s)\n"); + initPQExpBuffer(&sql); - printfPQExpBuffer(&sql, - "insert into pgbench_branches(bid,bbalance) " - "select bid, 0 " - "from generate_series(1, %d) as bid", nbranches * scale); - executeStatement(con, sql.data); - - printfPQExpBuffer(&sql, - "insert into pgbench_tellers(tid,bid,tbalance) " - "select tid, (tid - 1) / %d + 1, 0 " - "from generate_series(1, %d) as tid", ntellers, ntellers * scale); - executeStatement(con, sql.data); - - printfPQExpBuffer(&sql, - "insert into pgbench_accounts(aid,bid,abalance,filler) " - "select aid, (aid - 1) / %d + 1, 0, '' " - "from generate_series(1, " INT64_FORMAT ") as aid", - naccounts, (int64) naccounts * scale); - executeStatement(con, sql.data); + executeStatement(con, "begin"); + + /* truncate away any old data */ + initTruncateTables(con); + + if (multi_xact) + executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + if (multi_xact) + executeStatement(con, "begin"); + + printfPQExpBuffer(&sql, + "insert into pgbench_branches(bid, bbalance) " + "values(%d, 0)", i + 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_tellers(tid, bid, tbalance) " + "select tid + 1, tid / %d + 1, 0 " + "from generate_series(%d, %d) as tid", + ntellers, i * ntellers, (i + 1) * ntellers - 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_accounts(aid, bid, abalance, " + "filler) " + "select aid + 1, aid / %d + 1, 0, '' " + "from generate_series(" INT64_FORMAT ", " + INT64_FORMAT ") as aid", + naccounts, (int64) i * naccounts, + (int64) (i + 1) * naccounts - 1); + executeStatement(con, sql.data); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); termPQExpBuffer(&sql); +} + +/* + * Generating data via INSERT .. SELECT .. FROM unnest + * Possibly as "One transaction per scale" in multi-tansaction mode + */ +static void +generateDataInsertUnnest(PGconn *con) +{ + PQExpBufferData sql; - executeStatement(con, "commit"); + fprintf(stderr, "via INSERT .. SELECT unnest...\n"); + + initPQExpBuffer(&sql); + + executeStatement(con, "begin"); + + /* truncate away any old data */ + initTruncateTables(con); + + if (multi_xact) + executeStatement(con, "commit"); + + for (int s = 0; s < scale; s++) + { + if (multi_xact) + executeStatement(con, "begin"); + + printfPQExpBuffer(&sql, + "insert into pgbench_branches(bid,bbalance) " + "values(%d, 0)", s + 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_tellers(tid, bid, tbalance) " + "select unnest(array_agg(s.i order by s.i)) as tid, " + "%d as bid, 0 as tbalance " + "from generate_series(%d, %d) as s(i)", + s + 1, s * ntellers + 1, (s + 1) * ntellers); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "with data as (" + " select generate_series(" INT64_FORMAT ", " + INT64_FORMAT ") as i) " + "insert into pgbench_accounts(aid, bid, " + "abalance, filler) " + "select unnest(aid), unnest(bid), 0 as abalance, " + "'' as filler " + "from (select array_agg(i+1) aid, " + "array_agg(i/%d + 1) bid from data)", + (int64) s * naccounts + 1, + (int64) (s + 1) * naccounts, naccounts); + executeStatement(con, sql.data); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); + + termPQExpBuffer(&sql); +} + +/* + * Fill the standard tables with some data generated on the server side + * + * As already the case with the client-side data generation, the filler + * column defaults to NULL in pgbench_branches and pgbench_tellers, + * and is a blank-padded string in pgbench_accounts. + */ +static void +initGenerateDataServerSide(PGconn *con) +{ + fprintf(stderr, "generating data (server-side as %s transaction%s) ", + multi_xact ? "multiple" : "single", multi_xact ? "s" : ""); + + switch (data_generation_type) + { + case INIT_STEP_GEN_TYPE_INSERT_SERIES: + generateDataInsertSeries(con); + break; + case INIT_STEP_GEN_TYPE_INSERT_UNNEST: + generateDataInsertUnnest(con); + break; + } } /* @@ -5306,6 +5833,8 @@ initCreateFKeys(PGconn *con) static void checkInitSteps(const char *initialize_steps) { + char data_init_type = 0; + if (initialize_steps[0] == '\0') pg_fatal("no initialization steps specified"); @@ -5317,7 +5846,22 @@ checkInitSteps(const char *initialize_steps) pg_log_error_detail("Allowed step characters are: \"" ALL_INIT_STEPS "\"."); exit(1); } + + switch (*step) + { + case INIT_STEP_GEN_TYPE_COPY_TEXT: + case INIT_STEP_GEN_TYPE_COPY_BINARY: + case INIT_STEP_GEN_TYPE_INSERT_SERIES: + case INIT_STEP_GEN_TYPE_INSERT_UNNEST: + data_init_type++; + break; + } } + + if (data_init_type == 0) + pg_log_warning("No data generation type is provided"); + if (data_init_type > 1) + pg_log_warning("More than one type of data initialization is requested"); } /* @@ -5355,14 +5899,24 @@ runInitSteps(const char *initialize_steps) op = "create tables"; initCreateTables(con); break; - case 'g': + case INIT_STEP_GEN_TYPE_COPY_TEXT: + case INIT_STEP_GEN_TYPE_COPY_BINARY: op = "client-side generate"; + data_generation_type = *step; initGenerateDataClientSide(con); break; - case 'G': + case INIT_STEP_GEN_TYPE_INSERT_SERIES: + case INIT_STEP_GEN_TYPE_INSERT_UNNEST: op = "server-side generate"; + data_generation_type = *step; initGenerateDataServerSide(con); break; + case INIT_STEP_GEN_TYPE_SINGLE_XACT: + multi_xact = false; + break; + case INIT_STEP_GEN_TYPE_MULTI_XACT: + multi_xact = true; + break; case 'v': op = "vacuum"; initVacuum(con); @@ -6940,7 +7494,6 @@ main(int argc, char **argv) case 'I': pg_free(initialize_steps); initialize_steps = pg_strdup(optarg); - checkInitSteps(initialize_steps); initialization_option_set = true; break; case 'j': /* jobs */ @@ -7245,6 +7798,7 @@ main(int argc, char **argv) } } + checkInitSteps(initialize_steps); runInitSteps(initialize_steps); exit(0); } diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index b7685ea5d20..6c7783a77f7 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -16,25 +16,30 @@ sub check_data_state local $Test::Builder::Level = $Test::Builder::Level + 1; my $node = shift; my $type = shift; + my $sql_result; - my $sql_result = $node->safe_psql('postgres', - 'SELECT count(*) AS null_count FROM pgbench_accounts WHERE filler IS NULL LIMIT 10;' - ); - is($sql_result, '0', - "$type: filler column of pgbench_accounts has no NULL data"); $sql_result = $node->safe_psql('postgres', 'SELECT count(*) AS null_count FROM pgbench_branches WHERE filler IS NULL;' ); is($sql_result, '1', "$type: filler column of pgbench_branches has only NULL data"); + $sql_result = $node->safe_psql('postgres', 'SELECT count(*) AS null_count FROM pgbench_tellers WHERE filler IS NULL;' ); is($sql_result, '10', "$type: filler column of pgbench_tellers has only NULL data"); + + $sql_result = $node->safe_psql('postgres', + 'SELECT count(*) AS null_count FROM pgbench_accounts WHERE filler IS NULL LIMIT 10;' + ); + is($sql_result, '0', + "$type: filler column of pgbench_accounts has no NULL data"); + $sql_result = $node->safe_psql('postgres', 'SELECT count(*) AS data_count FROM pgbench_history;'); - is($sql_result, '0', "$type: pgbench_history has no data"); + is($sql_result, '0', + "$type: pgbench_history has no data"); } # start a pgbench specific server @@ -112,6 +117,7 @@ $node->pgbench( [qr{Perhaps you need to do initialization}], 'run without init'); + # Initialize pgbench tables scale 1 $node->pgbench( '-i', 0, @@ -125,7 +131,7 @@ $node->pgbench( 'pgbench scale 1 initialization',); # Check data state, after client-side data generation. -check_data_state($node, 'client-side'); +check_data_state($node, 'client-side (default options)'); # Again, with all possible options $node->pgbench( @@ -143,6 +149,7 @@ $node->pgbench( qr{done in \d+\.\d\d s } ], 'pgbench scale 1 initialization'); +check_data_state($node, 'client-side (all options)'); # Test interaction of --init-steps with legacy step-selection options $node->pgbench( @@ -154,7 +161,7 @@ $node->pgbench( qr{creating tables}, qr{creating 3 partitions}, qr{creating primary keys}, - qr{generating data \(server-side\)}, + qr{generating data \(server-side as single transaction\)}, qr{creating foreign keys}, qr{(?!vacuuming)}, # no vacuum qr{done in \d+\.\d\d s } @@ -164,6 +171,219 @@ $node->pgbench( # Check data state, after server-side data generation. check_data_state($node, 'server-side'); + +# Test server-side generation with generate_series +$node->pgbench( + '--initialize --init-steps=dtG', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side generate_series'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (generate_series)'); + +$node->pgbench( + '--initialize --init-steps=dtSG', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side generate_series'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (generate_series single XACT)'); + +$node->pgbench( + '--initialize --init-steps=dtMG', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as multiple transactions\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side generate_series'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (generate_series multiple XACTs)'); + + +# Test server-side generation with UNNEST +$node->pgbench( + '--initialize --init-steps=dtU', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side UNNEST'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (unnest)'); + +$node->pgbench( + '--initialize --init-steps=dtSU', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side UNNEST'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (unnest)'); + +$node->pgbench( + '--initialize --init-steps=dtMU', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as multiple transactions\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side UNNEST'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (unnest)'); + + +# Test client-side generation with COPY TEXT +$node->pgbench( + '--initialize --init-steps=dtg', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side TEXT (single XACT #1)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (text)'); + +$node->pgbench( + '--initialize --init-steps=dtSg', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{\d of \d+ tuples \(\d%\) of pgbench_branches done}, + qr{\d of \d+ tuples \(\d%\) of pgbench_tellers done}, + qr{\d of \d+ tuples \(\d%\) of pgbench_accounts done}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side TEXT (single XACT #2)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (text)'); + +$node->pgbench( + '--initialize --init-steps=dtMg', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as multiple transactions}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side TEXT (multiple XACTs)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (text)'); + + +# Test client-side generation with COPY BINARY +$node->pgbench( + '--initialize --init-steps=dtc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY (single XACT #1)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary)'); + +$node->pgbench( + '--initialize --init-steps=dtSc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY (single XACT #2)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary)'); + +$node->pgbench( + '--initialize --init-steps=dtMc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as multiple transactions}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary)'); + + +# Check data state, after different modes of client-side data generation. +check_data_state($node, 'client-side (binary)'); + +$node->pgbench( + '--initialize --init-steps=dtMccSc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as multiple transactions}, + qr{generating data \(client-side as multiple transactions}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY (multiple XACT modes)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary different XACT modes in list of --init-steps)'); + + # Run all builtin scripts, for a few transactions each $node->pgbench( '--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t' -- 2.43.0