diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 129ca79221..a7672012f1 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -59,6 +59,7 @@ typedef struct ConnCacheEntry bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ bool parallel_commit; /* do we commit (sub)xacts in parallel? */ + bool parallel_abort; /* do we abort (sub)xacts in parallel? */ bool invalidated; /* true if reconnect is pending */ bool keep_connections; /* setting value of keep_connections * server option */ @@ -80,6 +81,21 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; +/* Milliseconds to wait to cancel a query or execute a cleanup query */ +#define CONNECTION_CLEANUP_TIMEOUT 30000 + +/* Macro for constructing abort command to be sent */ +#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \ + do { \ + if (toplevel) \ + snprintf((sql), sizeof(sql), \ + "ABORT TRANSACTION"); \ + else \ + snprintf((sql), sizeof(sql), \ + "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \ + (entry)->xact_depth, (entry)->xact_depth); \ + } while(0) + /* * SQL functions */ @@ -106,14 +122,28 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); +static bool pgfdw_cancel_query_begin(PGconn *conn); +static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, + bool consume_input); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); +static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query); +static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, + TimestampTz endtime, + bool consume_input, + bool ignore_errors); static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out); static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel); +static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, + List **pending_entries, + List **cancel_requested); static void pgfdw_finish_pre_commit_cleanup(List *pending_entries); static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel); +static void pgfdw_finish_abort_cleanup(List *pending_entries, + List *cancel_requested, + bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); @@ -324,11 +354,12 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) * * By default, all the connections to any foreign servers are kept open. * - * Also determine whether to commit (sub)transactions opened on the remote - * server in parallel at (sub)transaction end. + * Also determine whether to commit/abort (sub)transactions opened on the + * remote server in parallel at (sub)transaction end. */ entry->keep_connections = true; entry->parallel_commit = false; + entry->parallel_abort = false; foreach(lc, server->options) { DefElem *def = (DefElem *) lfirst(lc); @@ -337,6 +368,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->keep_connections = defGetBoolean(def); else if (strcmp(def->defname, "parallel_commit") == 0) entry->parallel_commit = defGetBoolean(def); + else if (strcmp(def->defname, "parallel_abort") == 0) + entry->parallel_abort = defGetBoolean(def); } /* Now try to make the connection */ @@ -922,6 +955,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) HASH_SEQ_STATUS scan; ConnCacheEntry *entry; List *pending_entries = NIL; + List *cancel_requested = NIL; /* Quick exit if no connections were touched in this transaction. */ if (!xact_got_connection) @@ -1015,7 +1049,15 @@ pgfdw_xact_callback(XactEvent event, void *arg) case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: /* Rollback all remote transactions during abort */ - pgfdw_abort_cleanup(entry, true); + if (entry->parallel_abort) + { + if (pgfdw_abort_cleanup_begin(entry, true, + &pending_entries, + &cancel_requested)) + continue; + } + else + pgfdw_abort_cleanup(entry, true); break; } } @@ -1025,11 +1067,21 @@ pgfdw_xact_callback(XactEvent event, void *arg) } /* If there are any pending connections, finish cleaning them up */ - if (pending_entries) + if (pending_entries || cancel_requested) { - Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT || - event == XACT_EVENT_PRE_COMMIT); - pgfdw_finish_pre_commit_cleanup(pending_entries); + if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || + event == XACT_EVENT_PRE_COMMIT) + { + Assert(cancel_requested == NIL); + pgfdw_finish_pre_commit_cleanup(pending_entries); + } + else + { + Assert(event == XACT_EVENT_PARALLEL_ABORT || + event == XACT_EVENT_ABORT); + pgfdw_finish_abort_cleanup(pending_entries, cancel_requested, + true); + } } /* @@ -1054,6 +1106,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, ConnCacheEntry *entry; int curlevel; List *pending_entries = NIL; + List *cancel_requested = NIL; /* Nothing to do at subxact start, nor after commit. */ if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || @@ -1108,7 +1161,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, else { /* Rollback all remote subtransactions during abort */ - pgfdw_abort_cleanup(entry, false); + if (entry->parallel_abort) + { + if (pgfdw_abort_cleanup_begin(entry, false, + &pending_entries, + &cancel_requested)) + continue; + } + else + pgfdw_abort_cleanup(entry, false); } /* OK, we're outta that level of subtransaction */ @@ -1116,10 +1177,19 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, } /* If there are any pending connections, finish cleaning them up */ - if (pending_entries) + if (pending_entries || cancel_requested) { - Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB); - pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel); + if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) + { + Assert(cancel_requested == NIL); + pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel); + } + else + { + Assert(event == SUBXACT_EVENT_ABORT_SUB); + pgfdw_finish_abort_cleanup(pending_entries, cancel_requested, + false); + } } } @@ -1263,17 +1333,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) static bool pgfdw_cancel_query(PGconn *conn) { - PGcancel *cancel; - char errbuf[256]; - PGresult *result = NULL; TimestampTz endtime; - bool timed_out; /* * If it takes too long to cancel the query and discard the result, assume * the connection is dead. */ - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + + if (!pgfdw_cancel_query_begin(conn)) + return false; + return pgfdw_cancel_query_end(conn, endtime, false); +} + +static bool +pgfdw_cancel_query_begin(PGconn *conn) +{ + PGcancel *cancel; + char errbuf[256]; /* * Issue cancel request. Unfortunately, there's no good way to limit the @@ -1293,6 +1371,31 @@ pgfdw_cancel_query(PGconn *conn) PQfreeCancel(cancel); } + return true; +} + +static bool +pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input) +{ + PGresult *result = NULL; + bool timed_out; + + /* + * If requested, consume whatever data is available from the socket. + * (Note that if all data is available, this allows + * pgfdw_get_cleanup_result to call PQgetResult without forcing the + * overhead of WaitLatchOrSocket, which would be large compared to the + * overhead of PQconsumeInput.) + */ + if (consume_input && !PQconsumeInput(conn)) + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not get result of cancel request: %s", + pchomp(PQerrorMessage(conn))))); + return false; + } + /* Get and discard the result of the query. */ if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out)) { @@ -1327,9 +1430,7 @@ pgfdw_cancel_query(PGconn *conn) static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) { - PGresult *result = NULL; TimestampTz endtime; - bool timed_out; /* * If it takes too long to execute a cleanup query, assume the connection @@ -1337,8 +1438,18 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) * place (e.g. statement timeout, user cancel), so the timeout shouldn't * be too long. */ - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + + if (!pgfdw_exec_cleanup_query_begin(conn, query)) + return false; + return pgfdw_exec_cleanup_query_end(conn, query, endtime, + false, ignore_errors); +} +static bool +pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query) +{ /* * Submit a query. Since we don't use non-blocking mode, this also can * block. But its risk is relatively small, so we ignore that for now. @@ -1349,6 +1460,30 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) return false; } + return true; +} + +static bool +pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, + TimestampTz endtime, bool consume_input, + bool ignore_errors) +{ + PGresult *result = NULL; + bool timed_out; + + /* + * If requested, consume whatever data is available from the socket. + * (Note that if all data is available, this allows + * pgfdw_get_cleanup_result to call PQgetResult without forcing the + * overhead of WaitLatchOrSocket, which would be large compared to the + * overhead of PQconsumeInput.) + */ + if (consume_input && !PQconsumeInput(conn)) + { + pgfdw_report_error(WARNING, NULL, conn, false, query); + return false; + } + /* Get the result of the query. */ if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out)) { @@ -1504,12 +1639,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel) !pgfdw_cancel_query(entry->conn)) return; /* Unable to cancel running query */ - if (toplevel) - snprintf(sql, sizeof(sql), "ABORT TRANSACTION"); - else - snprintf(sql, sizeof(sql), - "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", - entry->xact_depth, entry->xact_depth); + CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel); if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) return; /* Unable to abort remote (sub)transaction */ @@ -1538,6 +1668,65 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel) entry->changing_xact_state = false; } +/* + * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but + * don't wait for the result. + * + * Returns true if the abort command or cancel request is successfully issued, + * false otherwise. If the abort command is successfully issued, the given + * connection cache entry is appended to *pending_entries. Othewise, if the + * cancel request is successfully issued, it's appended to *cancel_requested. + */ +static bool +pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, + List **pending_entries, List **cancel_requested) +{ + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is already unsalvageable, don't touch it further. + */ + if (entry->changing_xact_state) + return false; + + /* + * Mark this connection as in the process of changing transaction state. + */ + entry->changing_xact_state = true; + + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; + + /* + * If a command has been submitted to the remote server by using an + * asynchronous execution function, the command might not have yet + * completed. Check to see if a command is still being processed by the + * remote server, and if so, request cancellation of the command. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + { + if (!pgfdw_cancel_query_begin(entry->conn)) + return false; /* Unable to cancel running query */ + *cancel_requested = lappend(*cancel_requested, entry); + } + else + { + char sql[100]; + + CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel); + if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql)) + return false; /* Unable to abort remote transaction */ + *pending_entries = lappend(*pending_entries, entry); + } + + return true; +} + /* * Finish pre-commit cleanup of connections on each of which we've sent a * COMMIT command to the remote server. @@ -1644,6 +1833,168 @@ pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel) } } +/* + * Finish abort cleanup of connections on each of which we've sent an abort + * command or cancel request to the remote server. + */ +static void +pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, + bool toplevel) +{ + List *pending_deallocs = NIL; + ListCell *lc; + + /* + * For each of the pending cancel requests (if any), get and discard the + * result of the query, and submit an abort command to the remote server. + */ + if (cancel_requested) + { + foreach(lc, cancel_requested) + { + ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc); + TimestampTz endtime; + char sql[100]; + + Assert(entry->changing_xact_state); + + /* + * Set end time. You might think we should do this before issuing + * cancel request like in normal mode, but that is problematic, + * because if, for example, it took longer than 30 seconds to + * process the first few entries in the cancel_requested list, it + * would cause a timeout error when processing each of the + * remaining entries in the list, leading to slamming that entry's + * connection shut. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + + if (!pgfdw_cancel_query_end(entry->conn, endtime, true)) + { + /* Unable to cancel running query */ + pgfdw_reset_xact_state(entry, toplevel); + continue; + } + + /* Send an abort command in parallel if needed */ + CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel); + if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql)) + { + /* Unable to abort remote (sub)transaction */ + pgfdw_reset_xact_state(entry, toplevel); + } + else + pending_entries = lappend(pending_entries, entry); + } + } + + /* No further work if no pending entries */ + if (!pending_entries) + return; + + /* + * Get the result of the abort command for each of the pending entries + */ + foreach(lc, pending_entries) + { + ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc); + TimestampTz endtime; + char sql[100]; + + Assert(entry->changing_xact_state); + + /* + * Set end time. We do this now, not before issuing the command like + * in normal mode, for the same reason as for the cancel_requested + * entries. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + + CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel); + if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime, + true, false)) + { + /* Unable to abort remote (sub)transaction */ + pgfdw_reset_xact_state(entry, toplevel); + continue; + } + + if (toplevel) + { + /* Do a DEALLOCATE ALL in parallel if needed */ + if (entry->have_prep_stmt && entry->have_error) + { + if (!pgfdw_exec_cleanup_query_begin(entry->conn, + "DEALLOCATE ALL")) + { + /* Trouble clearing prepared statements */ + pgfdw_reset_xact_state(entry, toplevel); + } + else + pending_deallocs = lappend(pending_deallocs, entry); + continue; + } + entry->have_prep_stmt = false; + entry->have_error = false; + } + + /* Reset the per-connection state if needed */ + if (entry->state.pendingAreq) + memset(&entry->state, 0, sizeof(entry->state)); + + /* We're done with this entry; unset the changing_xact_state flag */ + entry->changing_xact_state = false; + pgfdw_reset_xact_state(entry, toplevel); + } + + /* No further work if no pending entries */ + if (!pending_deallocs) + return; + Assert(toplevel); + + /* + * Get the result of the DEALLOCATE command for each of the pending + * entries + */ + foreach(lc, pending_deallocs) + { + ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc); + TimestampTz endtime; + + Assert(entry->changing_xact_state); + Assert(entry->have_prep_stmt); + Assert(entry->have_error); + + /* + * Set end time. We do this now, not before issuing the command like + * in normal mode, for the same reason as for the cancel_requested + * entries. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + + if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL", + endtime, true, true)) + { + /* Trouble clearing prepared statements */ + pgfdw_reset_xact_state(entry, toplevel); + continue; + } + entry->have_prep_stmt = false; + entry->have_error = false; + + /* Reset the per-connection state if needed */ + if (entry->state.pendingAreq) + memset(&entry->state, 0, sizeof(entry->state)); + + /* We're done with this entry; unset the changing_xact_state flag */ + entry->changing_xact_state = false; + pgfdw_reset_xact_state(entry, toplevel); + } +} + /* * List active foreign server connections. * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index f210f91188..ed5bf9208f 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9509,7 +9509,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, parallel_commit, keep_connections +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, parallel_commit, parallel_abort, keep_connections CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different @@ -10934,10 +10934,12 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity RESET postgres_fdw.application_name; RESET debug_discard_caches; -- =================================================================== --- test parallel commit +-- test parallel commit and parallel abort -- =================================================================== ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true'); ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true'); CREATE TABLE ploc1 (f1 int, f2 text); CREATE FOREIGN TABLE prem1 (f1 int, f2 text) SERVER loopback OPTIONS (table_name 'ploc1'); @@ -11007,5 +11009,52 @@ SELECT * FROM prem2; 204 | quxqux (3 rows) +BEGIN; +INSERT INTO prem1 VALUES (105, 'test1'); +INSERT INTO prem2 VALUES (205, 'test2'); +ABORT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo + 104 | bazbaz +(3 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar + 204 | quxqux +(3 rows) + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (105, 'test1'); +INSERT INTO prem2 VALUES (205, 'test2'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (105, 'test1'); +INSERT INTO prem2 VALUES (205, 'test2'); +ABORT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo + 104 | bazbaz +(3 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar + 204 | quxqux +(3 rows) + ALTER SERVER loopback OPTIONS (DROP parallel_commit); +ALTER SERVER loopback OPTIONS (DROP parallel_abort); ALTER SERVER loopback2 OPTIONS (DROP parallel_commit); +ALTER SERVER loopback2 OPTIONS (DROP parallel_abort); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 572591a558..59f865fac3 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -122,6 +122,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) strcmp(def->defname, "truncatable") == 0 || strcmp(def->defname, "async_capable") == 0 || strcmp(def->defname, "parallel_commit") == 0 || + strcmp(def->defname, "parallel_abort") == 0 || strcmp(def->defname, "keep_connections") == 0) { /* these accept only boolean values */ @@ -251,6 +252,7 @@ InitPgFdwOptions(void) {"async_capable", ForeignServerRelationId, false}, {"async_capable", ForeignTableRelationId, false}, {"parallel_commit", ForeignServerRelationId, false}, + {"parallel_abort", ForeignServerRelationId, false}, {"keep_connections", ForeignServerRelationId, false}, {"password_required", UserMappingRelationId, false}, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 95b6b7192e..bd26739d9a 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3517,10 +3517,12 @@ RESET postgres_fdw.application_name; RESET debug_discard_caches; -- =================================================================== --- test parallel commit +-- test parallel commit and parallel abort -- =================================================================== ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true'); ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true'); CREATE TABLE ploc1 (f1 int, f2 text); CREATE FOREIGN TABLE prem1 (f1 int, f2 text) @@ -3559,5 +3561,26 @@ COMMIT; SELECT * FROM prem1; SELECT * FROM prem2; +BEGIN; +INSERT INTO prem1 VALUES (105, 'test1'); +INSERT INTO prem2 VALUES (205, 'test2'); +ABORT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (105, 'test1'); +INSERT INTO prem2 VALUES (205, 'test2'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (105, 'test1'); +INSERT INTO prem2 VALUES (205, 'test2'); +ABORT; +SELECT * FROM prem1; +SELECT * FROM prem2; + ALTER SERVER loopback OPTIONS (DROP parallel_commit); +ALTER SERVER loopback OPTIONS (DROP parallel_abort); ALTER SERVER loopback2 OPTIONS (DROP parallel_commit); +ALTER SERVER loopback2 OPTIONS (DROP parallel_abort); diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index d8dc715587..b86b3e9cc3 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -461,10 +461,10 @@ OPTIONS (ADD password_required 'false'); When multiple remote (sub)transactions are involved in a local - (sub)transaction, by default postgres_fdw commits - those remote (sub)transactions one by one when the local (sub)transaction - commits. - Performance can be improved with the following option: + (sub)transaction, by default postgres_fdw commits or + aborts those remote (sub)transactions one by one when the local + (sub)transaction commits or aborts. + Performance can be improved with the following options: @@ -479,27 +479,40 @@ OPTIONS (ADD password_required 'false'); This option can only be specified for foreign servers, not per-table. The default is false. + + + + parallel_abort (boolean) + - If multiple foreign servers with this option enabled are involved in - a local (sub)transaction, multiple remote (sub)transactions opened on - those foreign servers in the local (sub)transaction are committed in - parallel across those foreign servers when the local (sub)transaction - commits. - - - - For a foreign server with this option enabled, if many remote - (sub)transactions are opened on the foreign server in a local - (sub)transaction, this option might increase the remote server’s load - when the local (sub)transaction commits, so be careful when using this - option. + This option controls whether postgres_fdw aborts + remote (sub)transactions opened on a foreign server in a local + (sub)transaction in parallel when the local (sub)transaction aborts. + This option can only be specified for foreign servers, not per-table. + The default is false. + + If multiple foreign servers with these options enabled are involved in a + local (sub)transaction, multiple remote (sub)transactions opened on those + foreign servers in the local (sub)transaction are committed or aborted in + parallel across those foreign servers when the local (sub)transaction + commits or aborts. + + + + For a foreign server with these options enabled, if many remote + (sub)transactions are opened on the foreign server in a local + (sub)transaction, these options might increase the remote server’s load + when the local (sub)transaction commits or aborts, so be careful when + using these options. + +