From 0728193a5f02d0dd6a1f3ec5fef314aec646ba33 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Fri, 17 Feb 2023 21:01:15 +0100 Subject: [PATCH v8] pgbench: Prepare commands in pipelines in advance Failing to do so results in an error when a pgbench script starts a serializable transaction inside a pipeline. --- src/bin/pgbench/pgbench.c | 155 +++++++++++++------ src/bin/pgbench/t/001_pgbench_with_server.pl | 20 +++ 2 files changed, 126 insertions(+), 49 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 508ed218e8..38e0830e7e 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -628,7 +628,8 @@ typedef struct pg_time_usec_t txn_begin; /* used for measuring schedule lag times */ pg_time_usec_t stmt_begin; /* used for measuring statement latencies */ - bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ + /* whether client prepared each command of each script */ + bool **prepared; /* * For processing failures and repeating transactions with serialization @@ -733,12 +734,13 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; * argv Command arguments, the first of which is the command or SQL * string itself. For SQL commands, after post-processing * argv[0] is the same as 'lines' with variables substituted. - * varprefix SQL commands terminated with \gset or \aset have this set + * varprefix SQL commands terminated with \gset or \aset have this set * to a non NULL value. If nonempty, it's used to prefix the * variable name that receives the value. * aset do gset on all possible queries of a combined query (\;). * expr Parsed expression, if needed. * stats Time spent in this command. + * prepname The name that this command is prepared under, in prepare mode * retries Number of retries after a serialization or deadlock error in the * current command. * failures Number of errors in the current command that were not retried. @@ -754,6 +756,7 @@ typedef struct Command char *varprefix; PgBenchExpr *expr; SimpleStats stats; + char *prepname; int64 retries; int64 failures; } Command; @@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc) return true; } -#define MAX_PREPARE_NAME 32 -static void -preparedStatementName(char *buffer, int file, int state) -{ - sprintf(buffer, "P%d_%d", file, state); -} - /* * Report the abortion of the client when processing SQL commands. */ @@ -3053,6 +3049,87 @@ chooseScript(TState *thread) return i - 1; } +/* + * Prepare the SQL command from st->use_file at command_num. + */ +static void +prepareCommand(CState *st, int command_num) +{ + Command *command = sql_script[st->use_file].commands[command_num]; + + /* No prepare for non-SQL commands */ + if (command->type != SQL_COMMAND) + return; + + /* + * If not already done, allocate space for 'prepared' flags: one boolean + * for each command of each script. + */ + if (!st->prepared) + { + st->prepared = pg_malloc(sizeof(bool *) * num_scripts); + for (int i = 0; i < num_scripts; i++) + { + ParsedScript *script = &sql_script[i]; + int numcmds; + + for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++) + ; + st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds); + } + } + + if (!st->prepared[st->use_file][command_num]) + { + PGresult *res; + + pg_log_debug("client %d preparing %s", st->id, command->prepname); + res = PQprepare(st->con, command->prepname, + command->argv[0], command->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_log_error("%s", PQerrorMessage(st->con)); + PQclear(res); + st->prepared[st->use_file][command_num] = true; + } +} + +/* + * Prepare all the commands in the script that come after the \startpipeline + * that's at position st->command, and the first \endpipeline we find. + * + * (This sets the ->prepared flag for each relevant command, but doesn't move + * the st->command counter) + */ +static void +prepareCommandsInPipeline(CState *st) +{ + int j; + Command **commands = sql_script[st->use_file].commands; + + Assert(commands[st->command]->type == META_COMMAND && + commands[st->command]->meta == META_STARTPIPELINE); + + /* + * We set the 'prepared' flag on the \startpipeline itself to flag that we + * don't need to do this next time without calling prepareCommand(), even + * though we don't actually prepare this command. + */ + if (st->prepared && + st->prepared[st->use_file][st->command]) + return; + + for (j = st->command + 1; commands[j] != NULL; j++) + { + if (commands[j]->type == META_COMMAND && + commands[j]->meta == META_ENDPIPELINE) + break; + + prepareCommand(st, j); + } + + st->prepared[st->use_file][st->command] = true; +} + /* Send a SQL command, using the chosen querymode */ static bool sendCommand(CState *st, Command *command) @@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command) } else if (querymode == QUERY_PREPARED) { - char name[MAX_PREPARE_NAME]; const char *params[MAX_ARGS]; - if (!st->prepared[st->use_file]) - { - int j; - Command **commands = sql_script[st->use_file].commands; - - for (j = 0; commands[j] != NULL; j++) - { - PGresult *res; - - if (commands[j]->type != SQL_COMMAND) - continue; - preparedStatementName(name, st->use_file, j); - if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) - { - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("%s", PQerrorMessage(st->con)); - PQclear(res); - } - else - { - /* - * In pipeline mode, we use asynchronous functions. If a - * server-side error occurs, it will be processed later - * among the other results. - */ - if (!PQsendPrepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL)) - pg_log_error("%s", PQerrorMessage(st->con)); - } - } - st->prepared[st->use_file] = true; - } - + prepareCommand(st, st->command); getQueryParams(&st->variables, command, params); - preparedStatementName(name, st->use_file, st->command); - pg_log_debug("client %d sending %s", st->id, name); - r = PQsendQueryPrepared(st->con, name, command->argc - 1, + pg_log_debug("client %d sending %s", st->id, command->prepname); + r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1, params, NULL, NULL, 0); } else /* unknown sql mode */ @@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) thread->conn_duration += now - start; /* Reset session-local state */ - memset(st->prepared, 0, sizeof(st->prepared)); + pg_free(st->prepared); + st->prepared = NULL; } /* @@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } + /* + * If we're in prepared-query mode, we need to prepare all the + * commands that are inside the pipeline before we actually start the + * pipeline itself. This solves the problem that running BEGIN + * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a + * snapshot having been acquired by the prepare within the pipeline. + */ + if (querymode == QUERY_PREPARED) + prepareCommandsInPipeline(st); + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) { commandFailed(st, "startpipeline", "already in pipeline mode"); @@ -5421,6 +5473,7 @@ create_sql_command(PQExpBuffer buf, const char *source) { Command *my_command; char *p = skip_sql_comments(buf->data); + static int prepnr = 0; if (p == NULL) return NULL; @@ -5439,6 +5492,10 @@ create_sql_command(PQExpBuffer buf, const char *source) my_command->varprefix = NULL; /* allocated later, if needed */ my_command->expr = NULL; initSimpleStats(&my_command->stats); + if (querymode == QUERY_PREPARED) + my_command->prepname = psprintf("P_%d", prepnr++); + else + my_command->prepname = NULL; return my_command; } diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index 4bf508ea96..99273203f0 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -839,6 +839,26 @@ select 1 \gset f } }); +# Working \startpipeline in prepared query mode with serializable +$node->pgbench( + '-c4 -j2 -t 10 -n -M prepared', + 0, + [ + qr{type: .*/001_pgbench_pipeline_serializable}, + qr{actually processed: (\d+)/\1} + ], + [], + 'working \startpipeline with serializable', + { + '001_pgbench_pipeline_serializable' => q{ +-- test startpipeline with serializable +\startpipeline +BEGIN ISOLATION LEVEL SERIALIZABLE; +} . "select 1;\n" x 10 . q{ +END; +\endpipeline +} + }); # trigger many expression errors my @errors = ( -- 2.30.2