diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index ebf9ea3598..3c50f739d0 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8481,6 +8481,7 @@ drop table loct2; -- =================================================================== -- test COPY FROM -- =================================================================== +alter server loopback options (add batch_size '2'); create table loc2 (f1 int, f2 text); alter table loc2 set (autovacuum_enabled = 'false'); create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2'); @@ -8503,7 +8504,7 @@ copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) -COPY rem2, line 1: "-1 xyzzy" +COPY rem2, line 2 select * from rem2; f1 | f2 ----+----- @@ -8514,6 +8515,19 @@ select * from rem2; alter foreign table rem2 drop constraint rem2_f1positive; alter table loc2 drop constraint loc2_f1positive; delete from rem2; +create table foo (a int) partition by list (a); +create table foo1 (like foo); +create foreign table ffoo1 partition of foo for values in (1) + server loopback options (table_name 'foo1'); +create table foo2 (like foo); +create foreign table ffoo2 partition of foo for values in (2) + server loopback options (table_name 'foo2'); +create function print_new_row() returns trigger language plpgsql as $$ + begin raise notice '%', new; return new; end; $$; +create trigger ffoo1_br_trig before insert on ffoo1 + for each row execute function print_new_row(); +copy foo from stdin; +NOTICE: (1) -- Test local triggers create trigger trig_stmt_before before insert on rem2 for each statement execute procedure trigger_func(); @@ -8622,6 +8636,34 @@ drop trigger rem2_trig_row_before on rem2; drop trigger rem2_trig_row_after on rem2; drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +ERROR: column "f1" of relation "loc2" does not exist +CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2), ($3, $4) +COPY rem2, line 3 +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + f1 | f2 +----+---- +(0 rows) + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(2 rows) + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(4 rows) + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; @@ -8638,6 +8680,7 @@ select * from rem3; drop foreign table rem3; drop table loc3; +alter server loopback options (drop batch_size); -- =================================================================== -- test for TRUNCATE -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index f3b93954ee..3bee9d19a3 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2057,6 +2057,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) resultRelInfo->ri_TrigDesc->trig_insert_after_row))) return 1; + /* + * If the foreign table has no columns, disable batching as the INSERT + * syntax doesn't allow batching multiple empty rows into a zero-column + * table. This isn't needed in case of INSERT, but is in case of COPY. + * Note that in the latter case fmstate must be non-NULL. + */ + if (fmstate && list_length(fmstate->target_attrs) == 0) + return 1; + /* * Otherwise use the batch size specified for server/table. The number of * parameters in a batch is limited to 65535 (uint16), so make sure we diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index b7817c5a41..e013d55313 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2350,6 +2350,7 @@ drop table loct2; -- test COPY FROM -- =================================================================== +alter server loopback options (add batch_size '2'); create table loc2 (f1 int, f2 text); alter table loc2 set (autovacuum_enabled = 'false'); create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2'); @@ -2382,6 +2383,23 @@ alter table loc2 drop constraint loc2_f1positive; delete from rem2; +create table foo (a int) partition by list (a); +create table foo1 (like foo); +create foreign table ffoo1 partition of foo for values in (1) + server loopback options (table_name 'foo1'); +create table foo2 (like foo); +create foreign table ffoo2 partition of foo for values in (2) + server loopback options (table_name 'foo2'); +create function print_new_row() returns trigger language plpgsql as $$ + begin raise notice '%', new; return new; end; $$; +create trigger ffoo1_br_trig before insert on ffoo1 + for each row execute function print_new_row(); + +copy foo from stdin; +1 +2 +\. + -- Test local triggers create trigger trig_stmt_before before insert on rem2 for each statement execute procedure trigger_func(); @@ -2482,6 +2500,34 @@ drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +1 foo +2 bar +\. + +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; @@ -2495,6 +2541,7 @@ commit; select * from rem3; drop foreign table rem3; drop table loc3; +alter server loopback options (drop batch_size); -- =================================================================== -- test for TRUNCATE diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index a976008b3d..b1cb589d60 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -320,18 +320,44 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->line_buf_valid = false; save_cur_lineno = cstate->cur_lineno; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + { + int sent = 0; + + Assert(resultRelInfo->ri_BatchSize > 1 && + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert != NULL && + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize != NULL); + + /* Flush into foreign table or partition */ + while (sent < nused) + { + int size = (resultRelInfo->ri_BatchSize < nused - sent) ? + resultRelInfo->ri_BatchSize : (nused - sent); + int inserted = size; + + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate, + resultRelInfo, + &slots[sent], + NULL, + &inserted); + sent += size; + } + } + else + { + /* + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); + } for (i = 0; i < nused; i++) { @@ -343,6 +369,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, { List *recheckIndexes; + Assert(resultRelInfo->ri_RelationDesc->rd_rel->relkind != RELKIND_FOREIGN_TABLE); + cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, @@ -679,6 +707,23 @@ CopyFrom(CopyFromState cstate) resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, resultRelInfo); + /* + * Also, if the named relation is a foreign table, determine if the FDW + * supports batch insert and determine the batch size (a FDW may support + * batching, but it may be disabled for the server/table). + * + * If the FDW does not support batching, we set the batch size to 1. + */ + if (resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + resultRelInfo->ri_BatchSize = + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo); + else + resultRelInfo->ri_BatchSize = 1; + + Assert(resultRelInfo->ri_BatchSize >= 1); + /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -725,6 +770,15 @@ CopyFrom(CopyFromState cstate) */ insertMethod = CIM_SINGLE; } + else if (resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_BatchSize == 1) + { + /* + * Can't support multi-inserts to foreign tables if the FDW does not + * support batching. + */ + insertMethod = CIM_SINGLE; + } else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && resultRelInfo->ri_TrigDesc->trig_insert_new_table) { @@ -737,14 +791,12 @@ CopyFrom(CopyFromState cstate) */ insertMethod = CIM_SINGLE; } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) + else if (cstate->volatile_defexprs) { /* - * Can't support multi-inserts to foreign tables or if there are any - * volatile default expressions in the table. Similarly to the - * trigger case above, such expressions may query the table we're - * inserting into. + * Can't support multi-inserts if there are any volatile default + * expressions in the table. Similarly to the trigger case above, + * such expressions may query the table we're inserting into. * * Note: It does not matter if any partitions have any volatile * default expressions as we use the defaults from the target of the @@ -910,12 +962,14 @@ CopyFrom(CopyFromState cstate) /* * Disable multi-inserts when the partition has BEFORE/INSTEAD - * OF triggers, or if the partition is a foreign partition. + * OF triggers, or if the partition is a foreign partition + * that can't use batching. */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && !has_before_insert_row_trig && !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + (resultRelInfo->ri_FdwRoutine == NULL || + resultRelInfo->ri_BatchSize > 1); /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert)