diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 0db8d74..5668977 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2063,6 +2063,21 @@ postgresEndForeignInsert(EState *estate, finish_foreign_modify(fmstate); } +static PgFdwModifyState *copy_fmstate = NULL; + +static void +pgfdw_copy_dest_cb(void *buf, int len) +{ + PGconn *conn = copy_fmstate->conn; + + if (PQputCopyData(conn, (char *) buf, len) <= 0) + { + PGresult *res = PQgetResult(conn); + + pgfdw_report_error(ERROR, res, conn, true, copy_fmstate->query); + } +} + /* * * postgresBeginForeignCopyIn @@ -2076,6 +2091,8 @@ postgresBeginForeignCopyIn(ModifyTableState *mtstate, Relation rel = resultRelInfo->ri_RelationDesc; StringInfoData sql; RangeTblEntry *rte; + PGconn *conn; + PGresult *res; rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state); initStringInfo(&sql); @@ -2090,8 +2107,16 @@ postgresBeginForeignCopyIn(ModifyTableState *mtstate, NIL, false, NIL); - fmstate->cstate = BeginForeignCopyTo(resultRelInfo->ri_RelationDesc); + fmstate->cstate = BeginForeignCopyTo(rel, pgfdw_copy_dest_cb); resultRelInfo->ri_FdwState = fmstate; + + conn = fmstate->conn; + res = PQexec(conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(ERROR, res, conn, true, fmstate->query); + PQclear(res); + + copy_fmstate = fmstate; } /* @@ -2102,14 +2127,40 @@ static void postgresEndForeignCopyIn(EState *estate, ResultRelInfo *resultRelInfo) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + PGconn *conn = fmstate->conn; + PGresult *res; /* Check correct use of CopyIn FDW API. */ Assert(fmstate->cstate != NULL); + /* + * Finish COPY IN protocol. It is needed to do after successful copy or + * after an error. + */ + if (PQputCopyEnd(conn, NULL) <= 0 || + PQflush(conn)) + ereport(ERROR, + (errmsg("error returned by PQputCopyEnd: %s", + PQerrorMessage(conn)))); + + /* After successfully sending an EOF signal, check command status. */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + PQclear(res); + + /* Do this to ensure we've pumped libpq back to idle state */ + if (PQgetResult(conn) != NULL) + ereport(ERROR, + (errmsg("unexpected extra results during COPY of table: %s", + PQerrorMessage(conn)))); + EndForeignCopyTo(fmstate->cstate); pfree(fmstate->cstate); fmstate->cstate = NULL; finish_foreign_modify(fmstate); + + copy_fmstate = NULL; } /* @@ -2122,58 +2173,21 @@ postgresExecForeignCopyIn(ResultRelInfo *resultRelInfo, TupleTableSlot **slots, int nslots) { PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState; - PGresult *res; - PGconn *conn = fmstate->conn; bool status = false; int i; /* Check correct use of CopyIn FDW API. */ Assert(fmstate->cstate != NULL); - res = PQexec(conn, fmstate->query); - if (PQresultStatus(res) != PGRES_COPY_IN) - pgfdw_report_error(ERROR, res, conn, true, fmstate->query); - PQclear(res); - PG_TRY(); { for (i = 0; i < nslots; i++) - { - char *buf = NextForeignCopyRow(fmstate->cstate, slots[i]); - - if (PQputCopyData(conn, buf, strlen(buf)) <= 0) - { - res = PQgetResult(conn); - pgfdw_report_error(ERROR, res, conn, true, fmstate->query); - } - } + NextForeignCopyRow(fmstate->cstate, slots[i]); status = true; } PG_FINALLY(); { - /* Finish COPY IN protocol. It is needed to do after successful copy or - * after an error. - */ - if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 || - PQflush(conn)) - ereport(ERROR, - (errmsg("error returned by PQputCopyEnd: %s", - PQerrorMessage(conn)))); - - /* After successfully sending an EOF signal, check command status. */ - res = PQgetResult(conn); - if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) || - (status && PQresultStatus(res) != PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); - - PQclear(res); - /* Do this to ensure we've pumped libpq back to idle state */ - if (PQgetResult(conn) != NULL) - ereport(ERROR, - (errmsg("unexpected extra results during COPY of table: %s", - PQerrorMessage(conn)))); - if (!status) PG_RE_THROW(); } diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f5f1d40..51b7233 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -594,7 +594,6 @@ CopySendEndOfRow(CopyState cstate) break; case COPY_CALLBACK: CopySendChar(cstate, '\n'); - CopySendChar(cstate, '\0'); cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -1823,16 +1822,8 @@ EndCopy(CopyState cstate) pfree(cstate); } -static char *buf = NULL; -static void -data_dest_cb(void *outbuf, int len) -{ - buf = (char *) palloc(len); - memcpy(buf, (char *) outbuf, len); -} - CopyState -BeginForeignCopyTo(Relation rel) +BeginForeignCopyTo(Relation rel, copy_data_dest_cb data_dest_cb) { CopyState cstate; @@ -1990,11 +1981,10 @@ BeginCopyTo(ParseState *pstate, return cstate; } -char * +void NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot) { CopyOneRowTo(cstate, slot); - return buf; } /* diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index ef119a7..f31ed13 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -42,8 +42,8 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); -extern CopyState BeginForeignCopyTo(Relation rel); -extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); +extern CopyState BeginForeignCopyTo(Relation rel, copy_data_dest_cb data_dest_cb); +extern void NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); extern void EndForeignCopyTo(CopyState cstate); #endif /* COPY_H */