Thread: WIP 2 interpreters for plperl

WIP 2 interpreters for plperl

From
Andrew Dunstan
Date:
I have made some progress with what I think is needed to have two
interpreters for plperl. This is a lot harder than the pltcl case for
two reasons: 1. there are no restrictions on having 2 tcl interpreters,
and 2. tcl does not need to save and restore context as we have to do
with perl. I think I have a conceptual siolution to these two problems,
but what I have is currently segfaulting somewhat myteriously. Tracing a
dynamically loaded library in a postgres backend with a debugger is less
than fun, too. I am attaching what I currently have, liberally sprinkled
with elog(NOTICE) calls as trace writes.

I need to get some other work done today too, so I will return to this
later if I can. In the meanwhile, if anybody cares to cast a fresh set
of eyeballs over this, please do.

cheers

andrew

Index: plperl.c
===================================================================
RCS file: /cvsroot/pgsql/src/pl/plperl/plperl.c,v
retrieving revision 1.121
diff -c -r1.121 plperl.c
*** plperl.c    19 Oct 2006 18:32:47 -0000    1.121
--- plperl.c    5 Nov 2006 20:27:32 -0000
***************
*** 27,32 ****
--- 27,33 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/typcache.h"
+ #include "utils/hsearch.h"

  /* perl stuff */
  #include "plperl.h"
***************
*** 55,60 ****
--- 56,69 ----
      SV           *reference;
  } plperl_proc_desc;

+ /* hash table entry for proc desc  */
+
+ typedef struct plperl_proc_entry
+ {
+     char proc_name[NAMEDATALEN];
+     plperl_proc_desc *proc_data;
+ } plperl_proc_entry;
+
  /*
   * The information we cache for the duration of a single call to a
   * function.
***************
*** 82,94 ****
      Oid           *argtypioparams;
  } plperl_query_desc;

  /**********************************************************************
   * Global data
   **********************************************************************/
  static bool plperl_safe_init_done = false;
! static PerlInterpreter *plperl_interp = NULL;
! static HV  *plperl_proc_hash = NULL;
! static HV  *plperl_query_hash = NULL;

  static bool plperl_use_strict = false;

--- 91,128 ----
      Oid           *argtypioparams;
  } plperl_query_desc;

+ /* hash table entry for query desc  */
+
+ typedef struct plperl_query_entry
+ {
+     char query_name[NAMEDATALEN];
+     plperl_query_desc *query_data;
+ } plperl_query_entry;
+
  /**********************************************************************
   * Global data
   **********************************************************************/
+
+ typedef enum
+ {
+     INTERP_NONE,
+     INTERP_HELD,
+     INTERP_TRUSTED,
+     INTERP_UNTRUSTED,
+     INTERP_BOTH
+ } InterpState;
+
+ static InterpState interp_state = INTERP_NONE;
+ static bool can_run_two = false;
+
  static bool plperl_safe_init_done = false;
! static PerlInterpreter *plperl_trusted_interp = NULL;
! static PerlInterpreter *plperl_untrusted_interp = NULL;
! static PerlInterpreter *plperl_held_interp = NULL;
! static bool can_run_two;
! static bool trusted_context;
! static HTAB  *plperl_proc_hash = NULL;
! static HTAB  *plperl_query_hash = NULL;

  static bool plperl_use_strict = false;

***************
*** 144,149 ****
--- 178,184 ----
  {
      /* Be sure we do initialization only once (should be redundant now) */
      static bool inited = false;
+     HASHCTL     hash_ctl;

      if (inited)
          return;
***************
*** 157,162 ****
--- 192,213 ----

      EmitWarningsOnPlaceholders("plperl");

+     MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+
+     hash_ctl.keysize = NAMEDATALEN;
+     hash_ctl.entrysize = sizeof(plperl_proc_entry);
+
+     plperl_proc_hash = hash_create("PLPerl Procedures",
+                                    32,
+                                    &hash_ctl,
+                                    HASH_ELEM);
+
+     hash_ctl.entrysize = sizeof(plperl_query_entry);
+     plperl_query_hash = hash_create("PLPerl Queries",
+                                     32,
+                                     &hash_ctl,
+                                     HASH_ELEM);
+
      plperl_init_interp();

      inited = true;
***************
*** 235,240 ****
--- 286,381 ----
      "      elog(ERROR,'trusted Perl functions disabled - " \
      "      please upgrade Perl Safe module to version 2.09 or later');}]); }"

+ #define TEST_FOR_MULTI \
+     "use Config; " \
+     "$Config{usemultiplicity} eq 'define' or "  \
+     "($Config{usethreads} eq 'define' " \
+     " and $Config{useithreads} eq 'define')"
+
+
+ /********************************************************************
+  *
+  * We start out by creating a "held" interpreter that we can use in
+  * trusted or untrusted mode (but not both) as the need arises. Later, we
+  * assign that interpreter if it is available to either the trusted or
+  * untrusted interpreter. If it has already been assigned, and we need to
+  * create the other interpreter, we do that if we can, or error out.
+  * We detect if it is safe to run two interpreters during the setup of the
+  * dummy interpreter.
+  */
+
+
+ static void
+ check_interp(bool trusted)
+ {
+     elog(NOTICE,"starting check_interp");
+     if (interp_state == INTERP_HELD)
+     {
+         if (trusted)
+         {
+             plperl_trusted_interp = plperl_held_interp;
+             interp_state = INTERP_TRUSTED;
+         }
+         else
+         {
+             plperl_untrusted_interp = plperl_held_interp;
+             interp_state = INTERP_UNTRUSTED;
+         }
+         plperl_held_interp = NULL;
+         trusted_context = trusted;
+     }
+     else if (interp_state == INTERP_BOTH ||
+              (trusted && interp_state == INTERP_TRUSTED) ||
+              (!trusted && interp_state == INTERP_UNTRUSTED))
+     {
+         if (trusted_context != trusted)
+         {
+             if (trusted)
+                 PERL_SET_CONTEXT(plperl_trusted_interp);
+             else
+                 PERL_SET_CONTEXT(plperl_untrusted_interp);
+             trusted_context = trusted;
+         }
+     }
+     else if (can_run_two)
+     {
+         PERL_SET_CONTEXT(plperl_held_interp);
+         plperl_init_interp();
+         if (trusted)
+             plperl_trusted_interp = plperl_held_interp;
+         else
+             plperl_untrusted_interp = plperl_held_interp;
+         interp_state = INTERP_BOTH;
+         plperl_held_interp = NULL;
+         trusted_context = trusted;
+     }
+     else
+     {
+         elog(ERROR,
+              "can not allocate second Perl interpreter on this platform");
+
+     }
+     elog(NOTICE,"leaving check_interp");
+
+ }
+
+
+ static void
+ restore_context (bool old_context)
+ {
+     elog(NOTICE,"starting restore_context");
+
+     if (trusted_context != old_context)
+     {
+         if (old_context)
+             PERL_SET_CONTEXT(plperl_trusted_interp);
+         else
+             PERL_SET_CONTEXT(plperl_untrusted_interp);
+         trusted_context = old_context;
+     }
+     elog(NOTICE,"leaving restore_context");
+
+ }

  static void
  plperl_init_interp(void)
***************
*** 285,301 ****
      save_time = loc ? pstrdup(loc) : NULL;
  #endif

!     plperl_interp = perl_alloc();
!     if (!plperl_interp)
          elog(ERROR, "could not allocate Perl interpreter");

!     perl_construct(plperl_interp);
!     perl_parse(plperl_interp, plperl_init_shared_libs, 3, embedding, NULL);
!     perl_run(plperl_interp);

!     plperl_proc_hash = newHV();
!     plperl_query_hash = newHV();

  #ifdef WIN32

      eval_pv("use POSIX qw(locale_h);", TRUE);    /* croak on failure */
--- 426,452 ----
      save_time = loc ? pstrdup(loc) : NULL;
  #endif

!
!     elog(NOTICE,"starting init_interp");
!     plperl_held_interp = perl_alloc();
!     if (!plperl_held_interp)
          elog(ERROR, "could not allocate Perl interpreter");

!     perl_construct(plperl_held_interp);
!     perl_parse(plperl_held_interp, plperl_init_shared_libs,
!                3, embedding, NULL);
!     perl_run(plperl_held_interp);

!     if (interp_state == INTERP_NONE)
!     {
!         SV *res;

+         res = eval_pv(TEST_FOR_MULTI,TRUE);
+         can_run_two = SvIV(res);
+         interp_state = INTERP_HELD;
+     }
+
+     elog(NOTICE,"leaving init_interp");
  #ifdef WIN32

      eval_pv("use POSIX qw(locale_h);", TRUE);    /* croak on failure */
***************
*** 753,758 ****
--- 904,913 ----
      SV           *subref;
      int            count;
      char       *compile_sub;
+     bool       oldcontext = trusted_context;
+
+     elog(NOTICE,"starting create_sub");
+     check_interp(trusted);

      if (trusted && !plperl_safe_init_done)
      {
***************
*** 828,833 ****
--- 983,991 ----
      FREETMPS;
      LEAVE;

+     restore_context(oldcontext);
+     elog(NOTICE,"leaving create_sub");
+
      return subref;
  }

***************
*** 1009,1015 ****
--- 1167,1175 ----
      Datum        retval;
      ReturnSetInfo *rsi;
      SV           *array_ret = NULL;
+     bool       oldcontext = trusted_context;

+     elog(NOTICE,"starting plperl_func_handler");
      /*
       * Create the call_data beforing connecting to SPI, so that it is not
       * allocated in the SPI memory context
***************
*** 1037,1042 ****
--- 1197,1204 ----
                              "cannot accept a set")));
      }

+     check_interp(prodesc->lanpltrusted);
+
      perlret = plperl_call_perl_func(prodesc, fcinfo);

      /************************************************************
***************
*** 1146,1151 ****
--- 1308,1316 ----
          SvREFCNT_dec(perlret);

      current_call_data = NULL;
+     restore_context(oldcontext);
+     elog(NOTICE,"leaving plperl__func_handler");
+
      return retval;
  }

***************
*** 1158,1163 ****
--- 1323,1329 ----
      Datum        retval;
      SV           *svTD;
      HV           *hvTD;
+     bool       oldcontext = trusted_context;

      /*
       * Create the call_data beforing connecting to SPI, so that it is not
***************
*** 1174,1179 ****
--- 1340,1347 ----
      prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, true);
      current_call_data->prodesc = prodesc;

+     check_interp(prodesc->lanpltrusted);
+
      svTD = plperl_trigger_build_args(fcinfo);
      perlret = plperl_call_perl_trigger_func(prodesc, fcinfo, svTD);
      hvTD = (HV *) SvRV(svTD);
***************
*** 1244,1249 ****
--- 1412,1418 ----
          SvREFCNT_dec(perlret);

      current_call_data = NULL;
+     restore_context(oldcontext);
      return retval;
  }

***************
*** 1256,1262 ****
      char        internal_proname[64];
      plperl_proc_desc *prodesc = NULL;
      int            i;
!     SV          **svp;

      /* We'll need the pg_proc tuple in any case... */
      procTup = SearchSysCache(PROCOID,
--- 1425,1434 ----
      char        internal_proname[64];
      plperl_proc_desc *prodesc = NULL;
      int            i;
!     plperl_proc_entry *hash_entry;
!     bool found;
!
!     elog(NOTICE,"starting compile_plperl_function");

      /* We'll need the pg_proc tuple in any case... */
      procTup = SearchSysCache(PROCOID,
***************
*** 1274,1288 ****
      else
          sprintf(internal_proname, "__PLPerl_proc_%u_trigger", fn_oid);

      /************************************************************
       * Lookup the internal proc name in the hashtable
       ************************************************************/
!     svp = hv_fetch_string(plperl_proc_hash, internal_proname);
!     if (svp)
      {
          bool        uptodate;

!         prodesc = INT2PTR(plperl_proc_desc *, SvUV(*svp));

          /************************************************************
           * If it's present, must check whether it's still up to date.
--- 1446,1466 ----
      else
          sprintf(internal_proname, "__PLPerl_proc_%u_trigger", fn_oid);

+     elog(NOTICE,"looking for function");
      /************************************************************
       * Lookup the internal proc name in the hashtable
       ************************************************************/
!     hash_entry = hash_search(plperl_proc_hash, internal_proname,
!                              HASH_FIND, NULL);
!     elog(NOTICE,"lookup finished");
!
!     if (hash_entry)
      {
          bool        uptodate;

!         elog(NOTICE,"function exists");
!
!         prodesc = hash_entry->proc_data;

          /************************************************************
           * If it's present, must check whether it's still up to date.
***************
*** 1294,1301 ****

          if (!uptodate)
          {
!             /* need we delete old entry? */
              prodesc = NULL;
          }
      }

--- 1472,1485 ----

          if (!uptodate)
          {
!             elog(NOTICE,"function out of date ... removing");
!
!             free(prodesc); /* are we leaking memory here? */
              prodesc = NULL;
+             hash_search(plperl_proc_hash, internal_proname,
+                         HASH_REMOVE,NULL);
+             elog(NOTICE,"removal complete");
+
          }
      }

***************
*** 1469,1474 ****
--- 1653,1660 ----
          /************************************************************
           * Create the procedure in the interpreter
           ************************************************************/
+         elog(NOTICE,"creating subroutine");
+
          prodesc->reference = plperl_create_sub(proc_source, prodesc->lanpltrusted);
          pfree(proc_source);
          if (!prodesc->reference)    /* can this happen? */
***************
*** 1479,1490 ****
                   internal_proname);
          }

!         hv_store_string(plperl_proc_hash, internal_proname,
!                         newSVuv(PTR2UV(prodesc)));
      }

      ReleaseSysCache(procTup);

      return prodesc;
  }

--- 1665,1681 ----
                   internal_proname);
          }

!         elog(NOTICE,"storing new entry");
!
!         hash_entry = hash_search(plperl_proc_hash, internal_proname,
!                                  HASH_ENTER, &found);
!         hash_entry->proc_data = prodesc;
      }

      ReleaseSysCache(procTup);

+     elog(NOTICE,"leaving compile_plperl_func");
+
      return prodesc;
  }

***************
*** 1939,1944 ****
--- 2130,2137 ----
  plperl_spi_prepare(char *query, int argc, SV **argv)
  {
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;
+     bool        found;
      void       *plan;
      int            i;

***************
*** 2051,2057 ****
       * Insert a hashtable entry for the plan and return
       * the key to the caller.
       ************************************************************/
!     hv_store_string(plperl_query_hash, qdesc->qname, newSVuv(PTR2UV(qdesc)));

      return newSVstring(qdesc->qname);
  }
--- 2244,2253 ----
       * Insert a hashtable entry for the plan and return
       * the key to the caller.
       ************************************************************/
!
!     hash_entry = hash_search(plperl_query_hash, qdesc->qname,
!                              HASH_ENTER,&found);
!     hash_entry->query_data = qdesc;

      return newSVstring(qdesc->qname);
  }
***************
*** 2067,2072 ****
--- 2263,2269 ----
      char       *nulls;
      Datum       *argvalues;
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;

      /*
       * Execute the query inside a sub-transaction, so we can cope with errors
***************
*** 2084,2096 ****
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!         sv = hv_fetch_string(plperl_query_hash, query);
!         if (sv == NULL)
              elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
-         if (*sv == NULL || !SvOK(*sv))
-             elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value corrupted");

!         qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
          if (qdesc == NULL)
              elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value vanished");

--- 2281,2294 ----
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!
!         hash_entry = hash_search(plperl_query_hash, query,
!                                          HASH_FIND,NULL);
!         if (hash_entry == NULL)
              elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");

!         qdesc = hash_entry->query_data;
!
          if (qdesc == NULL)
              elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value vanished");

***************
*** 2201,2211 ****
  SV *
  plperl_spi_query_prepared(char *query, int argc, SV **argv)
  {
-     SV          **sv;
      int            i;
      char       *nulls;
      Datum       *argvalues;
      plperl_query_desc *qdesc;
      SV           *cursor;
      Portal        portal = NULL;

--- 2399,2409 ----
  SV *
  plperl_spi_query_prepared(char *query, int argc, SV **argv)
  {
      int            i;
      char       *nulls;
      Datum       *argvalues;
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;
      SV           *cursor;
      Portal        portal = NULL;

***************
*** 2225,2237 ****
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!         sv = hv_fetch_string(plperl_query_hash, query);
!         if (sv == NULL)
!             elog(ERROR, "spi_query_prepared: Invalid prepared query passed");
!         if (*sv == NULL || !SvOK(*sv))
!             elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value corrupted");

-         qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
          if (qdesc == NULL)
              elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value vanished");

--- 2423,2435 ----
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!         hash_entry = hash_search(plperl_query_hash, query,
!                                          HASH_FIND,NULL);
!         if (hash_entry == NULL)
!             elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
!
!         qdesc = hash_entry->query_data;

          if (qdesc == NULL)
              elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value vanished");

***************
*** 2335,2351 ****
  void
  plperl_spi_freeplan(char *query)
  {
-     SV          **sv;
      void       *plan;
      plperl_query_desc *qdesc;

!     sv = hv_fetch_string(plperl_query_hash, query);
!     if (sv == NULL)
!         elog(ERROR, "spi_exec_freeplan: Invalid prepared query passed");
!     if (*sv == NULL || !SvOK(*sv))
!         elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value corrupted");

-     qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
      if (qdesc == NULL)
          elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value vanished");

--- 2533,2549 ----
  void
  plperl_spi_freeplan(char *query)
  {
      void       *plan;
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;

!     hash_entry = hash_search(plperl_query_hash, query,
!                                          HASH_FIND,NULL);
!     if (hash_entry == NULL)
!         elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
!
!     qdesc = hash_entry->query_data;

      if (qdesc == NULL)
          elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value vanished");

***************
*** 2353,2359 ****
       * free all memory before SPI_freeplan, so if it dies, nothing will be
       * left over
       */
!     hv_delete(plperl_query_hash, query, strlen(query), G_DISCARD);
      plan = qdesc->plan;
      free(qdesc->argtypes);
      free(qdesc->arginfuncs);
--- 2551,2559 ----
       * free all memory before SPI_freeplan, so if it dies, nothing will be
       * left over
       */
!     hash_search(plperl_query_hash, query,
!                 HASH_REMOVE,NULL);
!
      plan = qdesc->plan;
      free(qdesc->argtypes);
      free(qdesc->arginfuncs);

Re: WIP 2 interpreters for plperl

From
Andrew Dunstan
Date:

I wrote:
>
> I have made some progress with what I think is needed to have two
> interpreters for plperl. This is a lot harder than the pltcl case for
> two reasons: 1. there are no restrictions on having 2 tcl
> interpreters, and 2. tcl does not need to save and restore context as
> we have to do with perl. I think I have a conceptual siolution to
> these two problems, but what I have is currently segfaulting somewhat
> myteriously. Tracing a dynamically loaded library in a postgres
> backend with a debugger is less than fun, too. I am attaching what I
> currently have, liberally sprinkled with elog(NOTICE) calls as trace
> writes.
>
>

With a little more perseverance I found the problem. The attached patch
passes regression. But it now needs plenty of eyeballs and testing.

cheers

andrew
Index: plperl.c
===================================================================
RCS file: /cvsroot/pgsql/src/pl/plperl/plperl.c,v
retrieving revision 1.121
diff -c -r1.121 plperl.c
*** plperl.c    19 Oct 2006 18:32:47 -0000    1.121
--- plperl.c    5 Nov 2006 22:20:16 -0000
***************
*** 27,32 ****
--- 27,33 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/typcache.h"
+ #include "utils/hsearch.h"

  /* perl stuff */
  #include "plperl.h"
***************
*** 55,60 ****
--- 56,69 ----
      SV           *reference;
  } plperl_proc_desc;

+ /* hash table entry for proc desc  */
+
+ typedef struct plperl_proc_entry
+ {
+     char proc_name[NAMEDATALEN];
+     plperl_proc_desc *proc_data;
+ } plperl_proc_entry;
+
  /*
   * The information we cache for the duration of a single call to a
   * function.
***************
*** 82,94 ****
      Oid           *argtypioparams;
  } plperl_query_desc;

  /**********************************************************************
   * Global data
   **********************************************************************/
  static bool plperl_safe_init_done = false;
! static PerlInterpreter *plperl_interp = NULL;
! static HV  *plperl_proc_hash = NULL;
! static HV  *plperl_query_hash = NULL;

  static bool plperl_use_strict = false;

--- 91,128 ----
      Oid           *argtypioparams;
  } plperl_query_desc;

+ /* hash table entry for query desc  */
+
+ typedef struct plperl_query_entry
+ {
+     char query_name[NAMEDATALEN];
+     plperl_query_desc *query_data;
+ } plperl_query_entry;
+
  /**********************************************************************
   * Global data
   **********************************************************************/
+
+ typedef enum
+ {
+     INTERP_NONE,
+     INTERP_HELD,
+     INTERP_TRUSTED,
+     INTERP_UNTRUSTED,
+     INTERP_BOTH
+ } InterpState;
+
+ static InterpState interp_state = INTERP_NONE;
+ static bool can_run_two = false;
+
  static bool plperl_safe_init_done = false;
! static PerlInterpreter *plperl_trusted_interp = NULL;
! static PerlInterpreter *plperl_untrusted_interp = NULL;
! static PerlInterpreter *plperl_held_interp = NULL;
! static bool can_run_two;
! static bool trusted_context;
! static HTAB  *plperl_proc_hash = NULL;
! static HTAB  *plperl_query_hash = NULL;

  static bool plperl_use_strict = false;

***************
*** 144,149 ****
--- 178,184 ----
  {
      /* Be sure we do initialization only once (should be redundant now) */
      static bool inited = false;
+     HASHCTL     hash_ctl;

      if (inited)
          return;
***************
*** 157,162 ****
--- 192,213 ----

      EmitWarningsOnPlaceholders("plperl");

+     MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+
+     hash_ctl.keysize = NAMEDATALEN;
+     hash_ctl.entrysize = sizeof(plperl_proc_entry);
+
+     plperl_proc_hash = hash_create("PLPerl Procedures",
+                                    32,
+                                    &hash_ctl,
+                                    HASH_ELEM);
+
+     hash_ctl.entrysize = sizeof(plperl_query_entry);
+     plperl_query_hash = hash_create("PLPerl Queries",
+                                     32,
+                                     &hash_ctl,
+                                     HASH_ELEM);
+
      plperl_init_interp();

      inited = true;
***************
*** 235,240 ****
--- 286,375 ----
      "      elog(ERROR,'trusted Perl functions disabled - " \
      "      please upgrade Perl Safe module to version 2.09 or later');}]); }"

+ #define TEST_FOR_MULTI \
+     "use Config; " \
+     "$Config{usemultiplicity} eq 'define' or "  \
+     "($Config{usethreads} eq 'define' " \
+     " and $Config{useithreads} eq 'define')"
+
+
+ /********************************************************************
+  *
+  * We start out by creating a "held" interpreter that we can use in
+  * trusted or untrusted mode (but not both) as the need arises. Later, we
+  * assign that interpreter if it is available to either the trusted or
+  * untrusted interpreter. If it has already been assigned, and we need to
+  * create the other interpreter, we do that if we can, or error out.
+  * We detect if it is safe to run two interpreters during the setup of the
+  * dummy interpreter.
+  */
+
+
+ static void
+ check_interp(bool trusted)
+ {
+     if (interp_state == INTERP_HELD)
+     {
+         if (trusted)
+         {
+             plperl_trusted_interp = plperl_held_interp;
+             interp_state = INTERP_TRUSTED;
+         }
+         else
+         {
+             plperl_untrusted_interp = plperl_held_interp;
+             interp_state = INTERP_UNTRUSTED;
+         }
+         plperl_held_interp = NULL;
+         trusted_context = trusted;
+     }
+     else if (interp_state == INTERP_BOTH ||
+              (trusted && interp_state == INTERP_TRUSTED) ||
+              (!trusted && interp_state == INTERP_UNTRUSTED))
+     {
+         if (trusted_context != trusted)
+         {
+             if (trusted)
+                 PERL_SET_CONTEXT(plperl_trusted_interp);
+             else
+                 PERL_SET_CONTEXT(plperl_untrusted_interp);
+             trusted_context = trusted;
+         }
+     }
+     else if (can_run_two)
+     {
+         PERL_SET_CONTEXT(plperl_held_interp);
+         plperl_init_interp();
+         if (trusted)
+             plperl_trusted_interp = plperl_held_interp;
+         else
+             plperl_untrusted_interp = plperl_held_interp;
+         interp_state = INTERP_BOTH;
+         plperl_held_interp = NULL;
+         trusted_context = trusted;
+     }
+     else
+     {
+         elog(ERROR,
+              "can not allocate second Perl interpreter on this platform");
+
+     }
+
+ }
+
+
+ static void
+ restore_context (bool old_context)
+ {
+     if (trusted_context != old_context)
+     {
+         if (old_context)
+             PERL_SET_CONTEXT(plperl_trusted_interp);
+         else
+             PERL_SET_CONTEXT(plperl_untrusted_interp);
+         trusted_context = old_context;
+     }
+ }

  static void
  plperl_init_interp(void)
***************
*** 285,300 ****
      save_time = loc ? pstrdup(loc) : NULL;
  #endif

-     plperl_interp = perl_alloc();
-     if (!plperl_interp)
-         elog(ERROR, "could not allocate Perl interpreter");

!     perl_construct(plperl_interp);
!     perl_parse(plperl_interp, plperl_init_shared_libs, 3, embedding, NULL);
!     perl_run(plperl_interp);

!     plperl_proc_hash = newHV();
!     plperl_query_hash = newHV();

  #ifdef WIN32

--- 420,443 ----
      save_time = loc ? pstrdup(loc) : NULL;
  #endif


!     plperl_held_interp = perl_alloc();
!     if (!plperl_held_interp)
!         elog(ERROR, "could not allocate Perl interpreter");

!     perl_construct(plperl_held_interp);
!     perl_parse(plperl_held_interp, plperl_init_shared_libs,
!                3, embedding, NULL);
!     perl_run(plperl_held_interp);
!
!     if (interp_state == INTERP_NONE)
!     {
!         SV *res;
!
!         res = eval_pv(TEST_FOR_MULTI,TRUE);
!         can_run_two = SvIV(res);
!         interp_state = INTERP_HELD;
!     }

  #ifdef WIN32

***************
*** 1009,1014 ****
--- 1152,1158 ----
      Datum        retval;
      ReturnSetInfo *rsi;
      SV           *array_ret = NULL;
+     bool       oldcontext = trusted_context;

      /*
       * Create the call_data beforing connecting to SPI, so that it is not
***************
*** 1037,1042 ****
--- 1181,1188 ----
                              "cannot accept a set")));
      }

+     check_interp(prodesc->lanpltrusted);
+
      perlret = plperl_call_perl_func(prodesc, fcinfo);

      /************************************************************
***************
*** 1146,1151 ****
--- 1292,1299 ----
          SvREFCNT_dec(perlret);

      current_call_data = NULL;
+     restore_context(oldcontext);
+
      return retval;
  }

***************
*** 1158,1163 ****
--- 1306,1312 ----
      Datum        retval;
      SV           *svTD;
      HV           *hvTD;
+     bool       oldcontext = trusted_context;

      /*
       * Create the call_data beforing connecting to SPI, so that it is not
***************
*** 1174,1179 ****
--- 1323,1330 ----
      prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, true);
      current_call_data->prodesc = prodesc;

+     check_interp(prodesc->lanpltrusted);
+
      svTD = plperl_trigger_build_args(fcinfo);
      perlret = plperl_call_perl_trigger_func(prodesc, fcinfo, svTD);
      hvTD = (HV *) SvRV(svTD);
***************
*** 1244,1249 ****
--- 1395,1401 ----
          SvREFCNT_dec(perlret);

      current_call_data = NULL;
+     restore_context(oldcontext);
      return retval;
  }

***************
*** 1256,1262 ****
      char        internal_proname[64];
      plperl_proc_desc *prodesc = NULL;
      int            i;
!     SV          **svp;

      /* We'll need the pg_proc tuple in any case... */
      procTup = SearchSysCache(PROCOID,
--- 1408,1416 ----
      char        internal_proname[64];
      plperl_proc_desc *prodesc = NULL;
      int            i;
!     plperl_proc_entry *hash_entry;
!     bool found;
!     bool oldcontext = trusted_context;

      /* We'll need the pg_proc tuple in any case... */
      procTup = SearchSysCache(PROCOID,
***************
*** 1277,1288 ****
      /************************************************************
       * Lookup the internal proc name in the hashtable
       ************************************************************/
!     svp = hv_fetch_string(plperl_proc_hash, internal_proname);
!     if (svp)
      {
          bool        uptodate;

!         prodesc = INT2PTR(plperl_proc_desc *, SvUV(*svp));

          /************************************************************
           * If it's present, must check whether it's still up to date.
--- 1431,1444 ----
      /************************************************************
       * Lookup the internal proc name in the hashtable
       ************************************************************/
!     hash_entry = hash_search(plperl_proc_hash, internal_proname,
!                              HASH_FIND, NULL);
!
!     if (hash_entry)
      {
          bool        uptodate;

!         prodesc = hash_entry->proc_data;

          /************************************************************
           * If it's present, must check whether it's still up to date.
***************
*** 1294,1301 ****

          if (!uptodate)
          {
!             /* need we delete old entry? */
              prodesc = NULL;
          }
      }

--- 1450,1459 ----

          if (!uptodate)
          {
!             free(prodesc); /* are we leaking memory here? */
              prodesc = NULL;
+             hash_search(plperl_proc_hash, internal_proname,
+                         HASH_REMOVE,NULL);
          }
      }

***************
*** 1469,1475 ****
--- 1627,1639 ----
          /************************************************************
           * Create the procedure in the interpreter
           ************************************************************/
+
+         check_interp(prodesc->lanpltrusted);
+
          prodesc->reference = plperl_create_sub(proc_source, prodesc->lanpltrusted);
+
+         restore_context(oldcontext);
+
          pfree(proc_source);
          if (!prodesc->reference)    /* can this happen? */
          {
***************
*** 1479,1486 ****
                   internal_proname);
          }

!         hv_store_string(plperl_proc_hash, internal_proname,
!                         newSVuv(PTR2UV(prodesc)));
      }

      ReleaseSysCache(procTup);
--- 1643,1651 ----
                   internal_proname);
          }

!         hash_entry = hash_search(plperl_proc_hash, internal_proname,
!                                  HASH_ENTER, &found);
!         hash_entry->proc_data = prodesc;
      }

      ReleaseSysCache(procTup);
***************
*** 1939,1944 ****
--- 2104,2111 ----
  plperl_spi_prepare(char *query, int argc, SV **argv)
  {
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;
+     bool        found;
      void       *plan;
      int            i;

***************
*** 2051,2057 ****
       * Insert a hashtable entry for the plan and return
       * the key to the caller.
       ************************************************************/
!     hv_store_string(plperl_query_hash, qdesc->qname, newSVuv(PTR2UV(qdesc)));

      return newSVstring(qdesc->qname);
  }
--- 2218,2227 ----
       * Insert a hashtable entry for the plan and return
       * the key to the caller.
       ************************************************************/
!
!     hash_entry = hash_search(plperl_query_hash, qdesc->qname,
!                              HASH_ENTER,&found);
!     hash_entry->query_data = qdesc;

      return newSVstring(qdesc->qname);
  }
***************
*** 2067,2072 ****
--- 2237,2243 ----
      char       *nulls;
      Datum       *argvalues;
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;

      /*
       * Execute the query inside a sub-transaction, so we can cope with errors
***************
*** 2084,2096 ****
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!         sv = hv_fetch_string(plperl_query_hash, query);
!         if (sv == NULL)
              elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
-         if (*sv == NULL || !SvOK(*sv))
-             elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value corrupted");

!         qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
          if (qdesc == NULL)
              elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value vanished");

--- 2255,2268 ----
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!
!         hash_entry = hash_search(plperl_query_hash, query,
!                                          HASH_FIND,NULL);
!         if (hash_entry == NULL)
              elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");

!         qdesc = hash_entry->query_data;
!
          if (qdesc == NULL)
              elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value vanished");

***************
*** 2201,2211 ****
  SV *
  plperl_spi_query_prepared(char *query, int argc, SV **argv)
  {
-     SV          **sv;
      int            i;
      char       *nulls;
      Datum       *argvalues;
      plperl_query_desc *qdesc;
      SV           *cursor;
      Portal        portal = NULL;

--- 2373,2383 ----
  SV *
  plperl_spi_query_prepared(char *query, int argc, SV **argv)
  {
      int            i;
      char       *nulls;
      Datum       *argvalues;
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;
      SV           *cursor;
      Portal        portal = NULL;

***************
*** 2225,2237 ****
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!         sv = hv_fetch_string(plperl_query_hash, query);
!         if (sv == NULL)
!             elog(ERROR, "spi_query_prepared: Invalid prepared query passed");
!         if (*sv == NULL || !SvOK(*sv))
!             elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value corrupted");

-         qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
          if (qdesc == NULL)
              elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value vanished");

--- 2397,2409 ----
          /************************************************************
           * Fetch the saved plan descriptor, see if it's o.k.
           ************************************************************/
!         hash_entry = hash_search(plperl_query_hash, query,
!                                          HASH_FIND,NULL);
!         if (hash_entry == NULL)
!             elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
!
!         qdesc = hash_entry->query_data;

          if (qdesc == NULL)
              elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value vanished");

***************
*** 2335,2351 ****
  void
  plperl_spi_freeplan(char *query)
  {
-     SV          **sv;
      void       *plan;
      plperl_query_desc *qdesc;

!     sv = hv_fetch_string(plperl_query_hash, query);
!     if (sv == NULL)
!         elog(ERROR, "spi_exec_freeplan: Invalid prepared query passed");
!     if (*sv == NULL || !SvOK(*sv))
!         elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value corrupted");

-     qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
      if (qdesc == NULL)
          elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value vanished");

--- 2507,2523 ----
  void
  plperl_spi_freeplan(char *query)
  {
      void       *plan;
      plperl_query_desc *qdesc;
+     plperl_query_entry *hash_entry;

!     hash_entry = hash_search(plperl_query_hash, query,
!                                          HASH_FIND,NULL);
!     if (hash_entry == NULL)
!         elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
!
!     qdesc = hash_entry->query_data;

      if (qdesc == NULL)
          elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value vanished");

***************
*** 2353,2359 ****
       * free all memory before SPI_freeplan, so if it dies, nothing will be
       * left over
       */
!     hv_delete(plperl_query_hash, query, strlen(query), G_DISCARD);
      plan = qdesc->plan;
      free(qdesc->argtypes);
      free(qdesc->arginfuncs);
--- 2525,2533 ----
       * free all memory before SPI_freeplan, so if it dies, nothing will be
       * left over
       */
!     hash_search(plperl_query_hash, query,
!                 HASH_REMOVE,NULL);
!
      plan = qdesc->plan;
      free(qdesc->argtypes);
      free(qdesc->arginfuncs);