contrib/file_fdw/file_fdw.c | 97 ++++++++++++++++++++++++++++++++++ src/backend/executor/execMain.c | 34 ++++++++++-- src/backend/executor/nodeModifyTable.c | 65 +++++++++++++++++++++-- src/backend/rewrite/rewriteHandler.c | 3 +- src/include/foreign/fdwapi.h | 19 +++++++ src/include/nodes/execnodes.h | 4 ++ src/include/storage/itemptr.h | 25 +++++++++ 7 files changed, 237 insertions(+), 10 deletions(-) diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 7c7fedf..ef9f4b2 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -33,6 +33,7 @@ #include "optimizer/var.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/lsyscache.h" PG_MODULE_MAGIC; @@ -96,6 +97,7 @@ typedef struct FileFdwExecutionState char *filename; /* file to read */ List *options; /* merged COPY options, excluding filename */ CopyState cstate; /* state of reading file */ + int lineno; /* pseudo ctid of the file */ } FileFdwExecutionState; /* @@ -130,6 +132,19 @@ static void fileEndForeignScan(ForeignScanState *node); static bool fileAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); +static void fileBeginForeignModify(CmdType operation, + ResultRelInfo *resultRelInfo, + EState *estate, + Plan *subplan, + int eflags); +static void fileExecForeignInsert(ResultRelInfo *resultRelInfo, + HeapTuple tuple); +static void fileExecForeignDelete(ResultRelInfo *resultRelInfo, + ItemPointer tupleid); +static void fileExecForeignUpdate(ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + HeapTuple tuple); +static void fileEndForeignModify(ResultRelInfo *resultRelInfo); /* * Helper functions @@ -169,6 +184,11 @@ file_fdw_handler(PG_FUNCTION_ARGS) fdwroutine->ReScanForeignScan = fileReScanForeignScan; fdwroutine->EndForeignScan = fileEndForeignScan; fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable; + fdwroutine->BeginForeignModify = fileBeginForeignModify; + fdwroutine->ExecForeignInsert = fileExecForeignInsert; + fdwroutine->ExecForeignDelete = fileExecForeignDelete; + fdwroutine->ExecForeignUpdate = fileExecForeignUpdate; + fdwroutine->EndForeignModify = fileEndForeignModify; PG_RETURN_POINTER(fdwroutine); } @@ -510,6 +530,36 @@ fileGetForeignPlan(PlannerInfo *root, List *scan_clauses) { Index scan_relid = baserel->relid; + ListCell *cell; + + /* + * XXX - An evidence FDW module can know what kind of accesses are + * requires on the target relation, if UPDATE, INSERT or DELETE. + * It shall be also utilized to appropriate lock level on FDW + * extensions that performs behalf on real RDBMS. + */ + if (root->parse->resultRelation == baserel->relid) + { + switch (root->parse->commandType) + { + case CMD_INSERT: + elog(INFO, "%s is the target relation of INSERT", + get_rel_name(foreigntableid)); + break; + case CMD_UPDATE: + elog(INFO, "%s is the target relation of UPDATE", + get_rel_name(foreigntableid)); + break; + case CMD_DELETE: + elog(INFO, "%s is the target relation of DELETE", + get_rel_name(foreigntableid)); + break; + default: + elog(INFO, "%s is the target relation of ??????", + get_rel_name(foreigntableid)); + break; + } + } /* * We have no native ability to evaluate restriction clauses, so we just @@ -598,6 +648,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) festate->filename = filename; festate->options = options; festate->cstate = cstate; + festate->lineno = 0; node->fdw_state = (void *) festate; } @@ -612,6 +663,7 @@ fileIterateForeignScan(ForeignScanState *node) { FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + HeapTuple tup; bool found; ErrorContextCallback errcontext; @@ -638,7 +690,12 @@ fileIterateForeignScan(ForeignScanState *node) slot->tts_values, slot->tts_isnull, NULL); if (found) + { ExecStoreVirtualTuple(slot); + tup = ExecMaterializeSlot(slot); + festate->lineno++; + PackPseudoItemPointer(&tup->t_self, festate->lineno); + } /* Remove error callback. */ error_context_stack = errcontext.previous; @@ -661,6 +718,7 @@ fileReScanForeignScan(ForeignScanState *node) festate->filename, NIL, festate->options); + festate->lineno = 0; } /* @@ -716,6 +774,45 @@ fileAnalyzeForeignTable(Relation relation, return true; } +static void +fileBeginForeignModify(CmdType operation, + ResultRelInfo *resultRelInfo, + EState *estate, + Plan *subplan, + int eflags) +{ + elog(INFO, "fdw_file: BeginForeignModify method"); +} + +static void +fileExecForeignInsert(ResultRelInfo *resultRelInfo, HeapTuple tuple) +{ + elog(INFO, "fdw_file: INSERT"); +} + +static void +fileExecForeignDelete(ResultRelInfo *resultRelInfo, + ItemPointer tupleid) +{ + elog(INFO, "fdw_file: DELETE (lineno = %lu)", + (unsigned long) UnpackPseudoItemPointer(tupleid)); +} + +static void +fileExecForeignUpdate(ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + HeapTuple tuple) +{ + elog(INFO, "fdw_file: UPDATE (lineno = %lu)", + (unsigned long) UnpackPseudoItemPointer(tupleid)); +} + +static void +fileEndForeignModify(ResultRelInfo *resultRelInfo) +{ + elog(INFO, "fdw_file: EndForeignModify method"); +} + /* * check_selective_binary_conversion * diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 440438b..7d20627 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -43,6 +43,7 @@ #include "catalog/namespace.h" #include "commands/trigger.h" #include "executor/execdebug.h" +#include "foreign/fdwapi.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "optimizer/clauses.h" @@ -933,6 +934,7 @@ void CheckValidResultRel(Relation resultRel, CmdType operation) { TriggerDesc *trigDesc = resultRel->trigdesc; + FdwRoutine *fdwroutine; switch (resultRel->rd_rel->relkind) { @@ -984,10 +986,34 @@ CheckValidResultRel(Relation resultRel, CmdType operation) } break; case RELKIND_FOREIGN_TABLE: - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot change foreign table \"%s\"", - RelationGetRelationName(resultRel)))); + fdwroutine = GetFdwRoutineByRelId(RelationGetRelid(resultRel)); + switch (operation) + { + case CMD_INSERT: + if (!fdwroutine || !fdwroutine->ExecForeignInsert) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot insert into foreign table \"%s\"", + RelationGetRelationName(resultRel)))); + break; + case CMD_UPDATE: + if (!fdwroutine || !fdwroutine->ExecForeignUpdate) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot update foreign table \"\"", + RelationGetRelationName(resultRel)))); + break; + case CMD_DELETE: + if (!fdwroutine || !fdwroutine->ExecForeignDelete) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot delete from foreign table \"\"", + RelationGetRelationName(resultRel)))); + break; + default: + elog(ERROR, "unrecognized CmdType: %d", (int) operation); + break; + } break; default: ereport(ERROR, diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index a7bce75..b2fc516 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -41,6 +41,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" +#include "foreign/fdwapi.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "storage/bufmgr.h" @@ -224,6 +225,14 @@ ExecInsert(TupleTableSlot *slot, newId = InvalidOid; } + else if (resultRelInfo->ri_fdwroutine) + { + FdwRoutine *fdwroutine = resultRelInfo->ri_fdwroutine; + + fdwroutine->ExecForeignInsert(resultRelInfo, tuple); + + newId = InvalidOid; + } else { /* @@ -334,6 +343,12 @@ ExecDelete(ItemPointer tupleid, if (!dodelete) /* "do nothing" */ return NULL; } + else if (resultRelInfo->ri_fdwroutine) + { + FdwRoutine *fdwroutine = resultRelInfo->ri_fdwroutine; + + fdwroutine->ExecForeignDelete(resultRelInfo, tupleid); + } else { /* @@ -538,6 +553,12 @@ ExecUpdate(ItemPointer tupleid, /* trigger might have changed tuple */ tuple = ExecMaterializeSlot(slot); } + else if (resultRelInfo->ri_fdwroutine) + { + FdwRoutine *fdwroutine = resultRelInfo->ri_fdwroutine; + + fdwroutine->ExecForeignUpdate(resultRelInfo, tupleid, tuple); + } else { /* @@ -805,10 +826,12 @@ ExecModifyTable(ModifyTableState *node) */ if (operation == CMD_UPDATE || operation == CMD_DELETE) { + Relation rel = resultRelInfo->ri_RelationDesc; Datum datum; bool isNull; - if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_RELATION) + if (RelationGetForm(rel)->relkind == RELKIND_RELATION || + RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE) { datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, @@ -964,6 +987,22 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) estate->es_result_relation_info = resultRelInfo; mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags); + /* + * Also tells FDW extensions to init the plan for this result rel + */ + if (RelationGetForm(resultRelInfo->ri_RelationDesc)->relkind == RELKIND_FOREIGN_TABLE) + { + Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + FdwRoutine *fdwroutine = GetFdwRoutineByRelId(relid); + + Assert(fdwroutine != NULL); + resultRelInfo->ri_fdwroutine = fdwroutine; + resultRelInfo->ri_fdw_state = NULL; + + if (fdwroutine->BeginForeignModify) + fdwroutine->BeginForeignModify(operation, resultRelInfo, + estate, subplan, eflags); + } resultRelInfo++; i++; } @@ -1104,21 +1143,22 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) resultRelInfo = mtstate->resultRelInfo; for (i = 0; i < nplans; i++) { + Relation rel = resultRelInfo->ri_RelationDesc; JunkFilter *j; subplan = mtstate->mt_plans[i]->plan; if (operation == CMD_INSERT || operation == CMD_UPDATE) - ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, - subplan->targetlist); + ExecCheckPlanOutput(rel, subplan->targetlist); j = ExecInitJunkFilter(subplan->targetlist, - resultRelInfo->ri_RelationDesc->rd_att->tdhasoid, + RelationGetDescr(rel)->tdhasoid, ExecInitExtraTupleSlot(estate)); if (operation == CMD_UPDATE || operation == CMD_DELETE) { /* For UPDATE/DELETE, find the appropriate junk attr now */ - if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_RELATION) + if (RelationGetForm(rel)->relkind == RELKIND_RELATION || + RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE) { j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid"); if (!AttributeNumberIsValid(j->jf_junkAttNo)) @@ -1181,6 +1221,21 @@ ExecEndModifyTable(ModifyTableState *node) { int i; + /* Let the FDW shut dowm */ + for (i=0; i < node->ps.state->es_num_result_relations; i++) + { + ResultRelInfo *resultRelInfo + = &node->ps.state->es_result_relations[i]; + + if (resultRelInfo->ri_fdwroutine) + { + FdwRoutine *fdwroutine = resultRelInfo->ri_fdwroutine; + + if (fdwroutine->EndForeignModify) + fdwroutine->EndForeignModify(resultRelInfo); + } + } + /* * Free the exprcontext */ diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 8f75948..f461b87 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1166,7 +1166,8 @@ rewriteTargetListUD(Query *parsetree, RangeTblEntry *target_rte, const char *attrname; TargetEntry *tle; - if (target_relation->rd_rel->relkind == RELKIND_RELATION) + if (target_relation->rd_rel->relkind == RELKIND_RELATION || + target_relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE) { /* * Emit CTID so that executor can find the row to update or delete. diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 721cd25..252d505 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -59,6 +59,20 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); +typedef void (*BeginForeignModify_function) (CmdType operation, + ResultRelInfo *resultRelInfo, + EState *estate, + Plan *subplan, + int eflags); +typedef void (*ExecForeignInsert_function) (ResultRelInfo *resultRelInfo, + HeapTuple tuple); +typedef void (*ExecForeignDelete_function) (ResultRelInfo *resultRelInfo, + ItemPointer tupleid); +typedef void (*ExecForeignUpdate_function) (ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + HeapTuple tuple); +typedef void (*EndForeignModify_function) (ResultRelInfo *resultRelInfo); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -90,6 +104,11 @@ typedef struct FdwRoutine * not provided. */ AnalyzeForeignTable_function AnalyzeForeignTable; + BeginForeignModify_function BeginForeignModify; + ExecForeignInsert_function ExecForeignInsert; + ExecForeignDelete_function ExecForeignDelete; + ExecForeignUpdate_function ExecForeignUpdate; + EndForeignModify_function EndForeignModify; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index fec07b8..db5b79b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -303,6 +303,8 @@ typedef struct JunkFilter * ConstraintExprs array of constraint-checking expr states * junkFilter for removing junk attributes from tuples * projectReturning for computing a RETURNING list + * fdwroutine FDW callbacks if foreign table + * fdw_state opaque state of FDW module, or NULL * ---------------- */ typedef struct ResultRelInfo @@ -320,6 +322,8 @@ typedef struct ResultRelInfo List **ri_ConstraintExprs; JunkFilter *ri_junkFilter; ProjectionInfo *ri_projectReturning; + struct FdwRoutine *ri_fdwroutine; + void *ri_fdw_state; } ResultRelInfo; /* ---------------- diff --git a/src/include/storage/itemptr.h b/src/include/storage/itemptr.h index 331812b..8fdee45 100644 --- a/src/include/storage/itemptr.h +++ b/src/include/storage/itemptr.h @@ -135,6 +135,31 @@ typedef ItemPointerData *ItemPointer; (pointer)->ip_posid = InvalidOffsetNumber \ ) +/* + * PackPseudoItemPointer + * Pack a pseudo item pointer of foreign table + */ +#define PackPseudoItemPointer(pointer,value) \ + do { \ + AssertMacro((value) < (1UL << (8 * (sizeof(BlockIdData) + \ + sizeof(OffsetNumber))))); \ + BlockIdSet(&((pointer)->ip_blkid), \ + (value) >> (8 * sizeof(OffsetNumber))); \ + (pointer)->ip_posid = \ + (value) & ((1 << (8 * sizeof(OffsetNumber))) - 1); \ + } while(0) + +/* + * UnpackPseudoItemPointer + * Unpack a pseudo item pointer of foreign table + */ +#define UnpackPseudoItemPointer(pointer) \ + ( \ + (BlockIdGetBlockNumber(&(pointer)->ip_blkid) \ + << (8 * sizeof(OffsetNumber))) \ + | ((pointer)->ip_posid) \ + ) + /* ---------------- * externs * ----------------