Deferred trigger queue to disk - Mailing list pgsql-patches

From deststar
Subject Deferred trigger queue to disk
Date
Msg-id 3EFCB46D.2020803@blueyonder.co.uk
Whole thread Raw
List pgsql-patches
All this does is put all the trigger events given to it after a certain
number out to disk in DeferredTriggerAddEvent; then read them as
necessary in DeferredTriggerInvokeExec.
The certain number after which it moves to disk is currently, &
arbiterily, 100000 defined by this line in trigger.h:
#define max_deferred_trigger_mem_queue  100000
It has passed the regression checks on Linux (RH9ish) and cygwin.
Also include is a quick test script I used
This is my first patch of any size so please review carefully.
regards,
- Stuart
*** trigger.c.orig    Wed Jun 25 19:39:26 2003
--- trigger.c    Fri Jun 27 20:50:20 2003
***************
*** 38,43 ****
--- 38,53 ----
  #include "utils/lsyscache.h"
  #include "utils/syscache.h"

+ #ifndef PG_TEMP_FILE_PREFIX
+ #define PG_TEMP_FILE_PREFIX "pgsql_tmp"
+ #endif
+
+ static void deferredTriggerClean();
+ static DeferredTriggerEvent deferredTriggerDiskImmReadNextEvent();
+ static void deferredTriggerDiskImmWriteDoneEvent(DeferredTriggerEvent event);
+ static void deferredTriggerDiskImmClean();
+ static void deferredTriggerDiskImmReset();
+ static void deferredTriggerWriteEvent(DeferredTriggerEvent event);

  static void InsertTrigger(TriggerDesc *trigdesc, Trigger *trigger, int indx);
  static HeapTuple GetTupleForTrigger(EState *estate,
***************
*** 1601,1606 ****
--- 1611,1620 ----
   * ----------
   */

+ /*
+  * Note count is not a correct count>=max_deferred_trigger_mem_queue
+  */
+
  typedef struct DeferredTriggersData {
          /* Internal data is held in a per-transaction memory context */
      MemoryContext            deftrig_cxt;
***************
*** 1613,1618 ****
--- 1627,1642 ----
      DeferredTriggerEvent    deftrig_events;
      DeferredTriggerEvent    deftrig_events_imm;
      DeferredTriggerEvent    deftrig_event_tail;
+     int                     num_files;
+     long                    *bytes_written;
+     FILE                    *file_handle;
+     long                    count;
+     char                    *filename;
+     FILE                    *imm_file_handle;
+     long                     imm_filepos;
+     int                     imm_filenum;
+     char                    *imm_filename;
+     int                     imm_szrd;
  } DeferredTriggersData;

  /* ----------
***************
*** 1638,1643 ****
--- 1662,2009 ----

  static DeferredTriggers    deferredTriggers;

+ /*
+  * Records an event done back into the disk queue
+  * requires event passed to it for speed rather than
+  * rereading it, then marking as done
+  * also writes the entire event to make sure nothing is missed
+  * assumes event size is the same as original
+  */
+ static void deferredTriggerDiskImmWriteDoneEvent(DeferredTriggerEvent event)
+ {
+ int bw,rt,eln,c;
+ FILE *fp;
+ void *buf,*cbp;
+ eln=(sizeof(int32)<<1)+sizeof(Oid)+(sizeof(ItemPointerData)<<1)+
+  (event->dte_n_items*sizeof(DeferredTriggerEventItem));
+ buf=palloc(eln);
+ if (!(buf))
+     elog(ERROR, "Could not allocate buffer for writing deferred event done");
+ fp=deferredTriggers->imm_file_handle;
+ if (!(fp))
+     elog(ERROR, "NULL file handle for done deferred trigger write");
+ rt=fseek(fp,deferredTriggers->imm_filepos,SEEK_SET);
+ if (rt!=0)
+     elog(ERROR,"Unable to seek for write done deferred trigger. Err=%i",errno);
+ *((int32 *) buf)=event->dte_event;
+ cbp=buf+sizeof(int32);
+ *((Oid *) cbp)=event->dte_relid;
+ cbp+=sizeof(Oid);
+ *((ItemPointerData *) cbp)=event->dte_oldctid;
+ cbp+=sizeof(ItemPointerData);
+ *((ItemPointerData *) cbp)=event->dte_newctid;
+ cbp+=sizeof(ItemPointerData);
+ *((int32 *) cbp)=event->dte_n_items;
+ cbp+=sizeof(int32);
+ /*
+  * copying all the items to the buffer
+  */
+ for(c=0;c<event->dte_n_items;c++)
+     {
+     *((DeferredTriggerEventItem *) cbp)=event->dte_item[c];
+     cbp+=sizeof(DeferredTriggerEventItem);
+     }
+ bw=fwrite(buf,1,eln,fp);
+ if (bw!=eln)
+     elog(ERROR,"deferred trigger write done should have returned %i, actual %i",
+      bw,eln);
+ rt=fseek(fp,deferredTriggers->imm_filepos +
+  deferredTriggers->imm_szrd,SEEK_SET);
+ if (rt!=0)
+     elog(ERROR,"Unable to seek after done deferred trigger. Err=%i",errno);
+ }
+
+ /*
+  * DeferredTriggerEvent deferredTriggerDiskImmReadNextEvent()
+  *
+  * Returns the next deferred trigger event from the disk queue.
+  * must be initialised with deferredTriggerDiskImmReset()
+  */
+ static DeferredTriggerEvent deferredTriggerDiskImmReadNextEvent()
+ {
+ void *buf,*cbp;
+ int rdln,bufln,c,rt;
+ DeferredTriggerEvent event,oevent;
+ if ((deferredTriggers->imm_filenum<1) || (!(deferredTriggers->imm_file_handle)))
+     elog(ERROR,
+      "Attempt to read from disk deferred trigger queue before initialisation.");
+ /*
+  * Check to see if we need to advance to the next file
+  */
+ deferredTriggers->imm_filepos+=deferredTriggers->imm_szrd;
+ deferredTriggers->imm_szrd=0;
+
+ if (deferredTriggers->imm_filepos>=
+  *(deferredTriggers->bytes_written+deferredTriggers->imm_filenum-1))
+     {
+     /*
+      * Check to see if no more files. Return null (no more events)
+      */
+     if (deferredTriggers->imm_file_handle!=deferredTriggers->file_handle)
+         {
+         fclose(deferredTriggers->imm_file_handle);
+         deferredTriggers->imm_file_handle=NULL;
+         }
+     else
+         deferredTriggers->imm_file_handle=NULL;
+     if (deferredTriggers->imm_filenum==deferredTriggers->num_files)
+         return NULL;
+     /*
+      * otherwise advance to next file
+      */
+     deferredTriggers->imm_filenum++;
+     sprintf(deferredTriggers->imm_filename+
+      strlen(deferredTriggers->imm_filename)-9,"%09d",
+      deferredTriggers->imm_filenum);
+     if (strcmp(deferredTriggers->imm_filename,deferredTriggers->filename)==0)
+         deferredTriggers->imm_file_handle = deferredTriggers->file_handle;
+     else
+         deferredTriggers->imm_file_handle=
+          fopen(deferredTriggers->imm_filename, "rb+");
+     if (!(deferredTriggers->imm_file_handle))
+         elog(ERROR, "Can not open for reading first disk event file handle %s"
+          ,deferredTriggers->imm_filename);
+     deferredTriggers->imm_filepos=0;
+     }
+ event=buf=NULL;
+ event=palloc(sizeof(DeferredTriggerEventData));
+ if (!(event))
+     elog(ERROR,"Can not allocate event for deferred trigger queue disk read.");
+ bufln=(sizeof(int32)<<1)+sizeof(Oid)+(sizeof(ItemPointerData)<<1);
+ buf=palloc(bufln);
+ if (!(event))
+     elog(ERROR,"Can not allocate buf for deferred trigger queue disk read.");
+ rt=fseek(deferredTriggers->imm_file_handle, deferredTriggers->imm_filepos,
+  SEEK_SET);
+ if (rt!=0)
+     elog(ERROR,"Unable to seek for trigger at %i. Err=%i",
+      deferredTriggers->imm_filepos,errno);
+ rdln=fread(buf,1,bufln,deferredTriggers->imm_file_handle);
+ if (rdln!=bufln)
+     elog(ERROR,"deferred trigger fread should have returned %i, actual %i. Err=%i. imm_filepos=%i, bytes_written=%i,
ftell=%i",
+      bufln,rdln,errno,(int) deferredTriggers->imm_filepos
+      ,(int) *(deferredTriggers->bytes_written+deferredTriggers->imm_filenum-1)
+      ,(int) ftell(deferredTriggers->imm_file_handle));
+ deferredTriggers->imm_szrd=bufln;
+ event->dte_event=*((int32 *) buf);
+ cbp=buf+sizeof(int32);
+ event->dte_relid=*((Oid *) cbp);
+ cbp+=sizeof(Oid);
+ event->dte_oldctid=*((ItemPointerData *) cbp);
+ cbp+=sizeof(ItemPointerData);
+ event->dte_newctid=*((ItemPointerData *) cbp);
+ cbp+=sizeof(ItemPointerData);
+ event->dte_n_items=*((int32 *) cbp);
+ pfree(buf);
+ bufln=event->dte_n_items*sizeof(DeferredTriggerEventItem);
+ buf=palloc(bufln);
+ if (!(event))
+     elog(ERROR,"Can not allocate buf for deferred trigger queue disk read.");
+ rdln=fread(buf,1,bufln,deferredTriggers->imm_file_handle);
+ if (rdln!=bufln)
+     elog(ERROR,"deferred trigger fread should have returned %i, actual %i",
+      bufln,rdln);
+ deferredTriggers->imm_szrd+=bufln;
+ if (event->dte_n_items>1)
+     {
+     oevent=event;
+     event=palloc(sizeof(DeferredTriggerEventData)+
+      sizeof(DeferredTriggerEventItem));
+     if (!(event->dte_item))
+         elog(ERROR,
+          "Could not allocate event->dte_item for deferred trigger disk read");
+     *event=*oevent;
+     pfree(oevent);
+     }
+ for(c=0,cbp=buf;c<event->dte_n_items;c++,cbp+=sizeof(DeferredTriggerEventItem))
+    event->dte_item[c]=*((DeferredTriggerEventItem *) cbp);
+ event->dte_next=NULL;
+ pfree(buf);
+ return event;
+ }
+
+
+ /*
+  * Cleans up after DiskImm Stuff
+  */
+ static void deferredTriggerDiskImmClean()
+ {
+ deferredTriggers->imm_szrd=0;
+ deferredTriggers->imm_filenum=0;
+ deferredTriggers->imm_filepos=0;
+ if (deferredTriggers->imm_file_handle)
+     {
+     if (deferredTriggers->imm_file_handle==deferredTriggers->file_handle)
+         deferredTriggers->imm_file_handle=NULL;
+     else
+         {
+         fclose(deferredTriggers->imm_file_handle);
+         deferredTriggers->imm_file_handle=NULL;
+         }
+     }
+ }
+
+ /*
+  * Cleans up everything that disk deferred trigger queue might have added
+  * (inc free memory & rm files).
+  */
+ static void deferredTriggerClean()
+ {
+ int c,rt;
+ deferredTriggers->count=0;
+ if (deferredTriggers->num_files<1)
+     return;
+
+ deferredTriggerDiskImmClean();
+
+ if (deferredTriggers->bytes_written)
+     {
+     pfree(deferredTriggers->bytes_written);
+     deferredTriggers->bytes_written=NULL;
+     }
+ fclose(deferredTriggers->file_handle);
+ deferredTriggers->file_handle=NULL;
+ for(c=1;c<=deferredTriggers->num_files;c++)
+     {
+     sprintf(deferredTriggers->filename+strlen(deferredTriggers->filename)-9,
+      "%09d", c);
+     rt=remove(deferredTriggers->filename);
+     if (rt!=0)
+         elog(DEBUG1,"Failed to remove %s. Err=%i"
+          ,deferredTriggers->filename,errno);
+     else
+         elog(DEBUG1,"Succesfully removed %s",deferredTriggers->filename);
+     }
+ if (deferredTriggers->imm_filename)
+     {
+     pfree(deferredTriggers->imm_filename);
+     deferredTriggers->imm_filename=NULL;
+     }
+ if (deferredTriggers->filename)
+     {
+     pfree(deferredTriggers->filename);
+     deferredTriggers->filename=NULL;
+     }
+
+ deferredTriggers->num_files=0;
+ }
+
+ /*
+  * Initialises the stuff for reading disk deferred triggers.
+  */
+ static void deferredTriggerDiskImmReset()
+ {
+ deferredTriggers->imm_szrd=0;
+ deferredTriggers->imm_filepos=0;
+ deferredTriggers->imm_filenum=1;
+
+ if (deferredTriggers->imm_file_handle)
+     {
+     if (deferredTriggers->imm_file_handle==deferredTriggers->file_handle)
+         deferredTriggers->imm_file_handle=NULL;
+     else
+         {
+         fclose(deferredTriggers->imm_file_handle);
+         deferredTriggers->imm_file_handle=NULL;
+         }
+     }
+ sprintf(deferredTriggers->imm_filename,"%s/pgsql_tmp/%sdeftrig_%i-000000001",
+  DatabasePath,PG_TEMP_FILE_PREFIX,(int) GetCurrentTransactionId());
+ if (strcmp(deferredTriggers->imm_filename,deferredTriggers->filename)==0)
+     deferredTriggers->imm_file_handle = deferredTriggers->file_handle;
+ else
+     deferredTriggers->imm_file_handle
+      = fopen(deferredTriggers->imm_filename, "rb+");
+ if (!(deferredTriggers->imm_file_handle))
+     elog(ERROR, "Can not open for reading first disk event file handle %s"
+      ,deferredTriggers->imm_filename);
+ }
+
+ /*
+  *  deferredTriggerWriteEvent(DeferredTriggerEvent event)
+  *
+  *  Writes to disk. Will automatically go to the next file
+  *  but needs to have the first one already made
+  */
+ static void deferredTriggerWriteEvent(DeferredTriggerEvent event)
+ {
+ int eln,c,bw;
+ long *n;
+ void *buf,*cbp;
+ eln=(sizeof(int32)<<1)+sizeof(Oid)+(sizeof(ItemPointerData)<<1)+
+  (event->dte_n_items*sizeof(DeferredTriggerEventItem));
+ buf=palloc(eln);
+ if (!(buf))
+     elog(ERROR, "Could not allocated deferred trigger write buffer");
+ if (eln+*(deferredTriggers->bytes_written+
+  deferredTriggers->num_files-1)>1073741823)
+     {
+     deferredTriggers->num_files++;
+     n=(long *) MemoryContextAlloc(TopTransactionContext,
+      sizeof(long)*(deferredTriggers->num_files));
+     if (!n)
+         elog(ERROR, "Could not expand deferred trigger bytes written array");
+     for(c=0;c<deferredTriggers->num_files-1;c++)
+         *(n+c)=*(deferredTriggers->bytes_written+c);
+     pfree(deferredTriggers->bytes_written);
+     deferredTriggers->bytes_written=n;
+     *(deferredTriggers->bytes_written + deferredTriggers->num_files-1)=0;
+     if (deferredTriggers->file_handle!=deferredTriggers->imm_file_handle)
+         {
+         fclose(deferredTriggers->file_handle);
+         deferredTriggers->file_handle=NULL;
+         }
+     else
+         deferredTriggers->file_handle=NULL;
+     sprintf(deferredTriggers->filename+strlen(deferredTriggers->filename)-9,
+      "%09d", c);
+     deferredTriggers->file_handle=fopen(deferredTriggers->filename, "wb");
+     if (!(deferredTriggers->file_handle))
+         elog(ERROR, "Can not open deferred trigger event file %s"
+          ,deferredTriggers->filename);
+     }
+ /*
+  * paranoid check to see if there is a file handle (should always be)
+  */
+ if (!(deferredTriggers->file_handle))
+     {
+     elog(DEBUG1, "No File handle for deferred trigger file - should be here");
+     deferredTriggers->file_handle=fopen(deferredTriggers->filename, "wb");
+     if (!(deferredTriggers->file_handle))
+         elog(ERROR, "Can not open deferred trigger event file %s"
+          ,deferredTriggers->filename);
+     }
+ /*
+  * copy the data to a buffer
+  */
+ *((int32 *) buf)=event->dte_event;
+ cbp=buf+sizeof(int32);
+ *((Oid *) cbp)=event->dte_relid;
+ cbp+=sizeof(Oid);
+ *((ItemPointerData *) cbp)=event->dte_oldctid;
+ cbp+=sizeof(ItemPointerData);
+ *((ItemPointerData *) cbp)=event->dte_newctid;
+ cbp+=sizeof(ItemPointerData);
+ *((int32 *) cbp)=event->dte_n_items;
+ cbp+=sizeof(int32);
+ /*
+  * copying all the items to the buffer
+  */
+ for(c=0;c<event->dte_n_items;c++)
+     {
+     *((DeferredTriggerEventItem *) cbp)=event->dte_item[c];
+     cbp+=sizeof(DeferredTriggerEventItem);
+     }
+ if (fseek(deferredTriggers->file_handle,0,SEEK_END)!=0)
+     elog(ERROR,"Could not seek end of file for write event");
+ bw=fwrite(buf,1,eln,deferredTriggers->file_handle);
+ if (bw!=eln)
+     elog(ERROR,"deferred trigger write should have returned %i, actual %i",
+      bw,eln);
+ *(deferredTriggers->bytes_written+deferredTriggers->num_files-1)+=bw;
+ // fflush(deferredTriggers->file_handle);
+ }
+
  /* ----------
   * deferredTriggerCheckState()
   *
***************
*** 1705,1727 ****
  static void
  deferredTriggerAddEvent(DeferredTriggerEvent event)
  {
!     /*
!      * Since the event list could grow quite long, we keep track of the
!      * list tail and append there, rather than just doing a stupid
!      * "lappend". This avoids O(N^2) behavior for large numbers of events.
!      */
!     event->dte_next = NULL;
!     if (deferredTriggers->deftrig_event_tail == NULL)
!     {
!         /* first list entry */
!         deferredTriggers->deftrig_events = event;
!         deferredTriggers->deftrig_event_tail = event;
!     }
      else
!     {
!         deferredTriggers->deftrig_event_tail->dte_next = event;
!         deferredTriggers->deftrig_event_tail = event;
!     }
  }


--- 2071,2159 ----
  static void
  deferredTriggerAddEvent(DeferredTriggerEvent event)
  {
! char *dskpth;
! /*
!  * Since the event list could grow quite long, we keep track of the
!  * list tail and append there, rather than just doing a stupid
!  * "lappend". This avoids O(N^2) behavior for large numbers of events.
!  */
! event->dte_next = NULL;
! if (deferredTriggers->deftrig_event_tail == NULL)
!    {
!     /* first list entry */
!     deferredTriggers->count=0;
!     deferredTriggers->num_files=0;
!     deferredTriggers->deftrig_events = event;
!     deferredTriggers->deftrig_event_tail = event;
!    }
! else
!    {
!     if (deferredTriggers->count<max_deferred_trigger_mem_queue)
!         {
!         deferredTriggers->deftrig_event_tail->dte_next = event;
!         deferredTriggers->deftrig_event_tail = event;
!         deferredTriggers->count++;
!         }
!     /*
!      * Record the rest of the deferred triggers to disk
!      */
      else
!         {
!         if (deferredTriggers->count==max_deferred_trigger_mem_queue)
!             {
!             /*
!              * Use a false trigger event to say this is the end of mem store
!              */
!             elog(DEBUG1, "Createing disk queue for xid %i",
!              (int) GetCurrentTransactionId());
!             deferredTriggers->count++;
!             deferredTriggers->num_files=1;
!             sprintf(deferredTriggers->filename,
!              "%s/pgsql_tmp/%sdeftrig_%i-000000001",
!              DatabasePath,PG_TEMP_FILE_PREFIX,(int) GetCurrentTransactionId());
!             deferredTriggers->bytes_written=(long *)
!              MemoryContextAlloc(TopTransactionContext, sizeof(long));
!             if (!(deferredTriggers->bytes_written))
!                 elog(ERROR,
!                  "Could not create deferred trigger bytes written array");
!             *deferredTriggers->bytes_written=0;
!             deferredTriggers->file_handle
!              =fopen(deferredTriggers->filename, "wb+");
!             if (deferredTriggers->file_handle==NULL)
!                 {
!                 dskpth=palloc(strlen(DatabasePath)+11);
!                 if (!(dskpth))
!                     elog(ERROR, "Can not allocate disk deferred trigger path");
!                 sprintf(dskpth,"%s/pgsql_tmp",DatabasePath);
!                 if (mkdir(dskpth,S_IREAD | S_IWRITE)==0)
!                     elog(DEBUG1, "Created temp directory %s",dskpth);
!                 pfree(dskpth);
!                 dskpth=NULL;
!                 deferredTriggers->file_handle
!                  = fopen(deferredTriggers->filename, "wb");
!                 if (!(deferredTriggers->file_handle))
!                     elog(ERROR,
!                      "Can not open first disk event file handle for %s"
!                      ,deferredTriggers->filename);
!                 elog(DEBUG1, "Created disk queue %s"
!                  ,deferredTriggers->filename);
!                 }
!             else
!                 elog(DEBUG1, "Created disk queue file %s"
!                  ,deferredTriggers->filename);
!             deferredTriggerWriteEvent(event);
!             event->dte_event=TRIGGER_DEFERRED_ON_DISK;
!             event->dte_next=NULL;
!             deferredTriggers->deftrig_event_tail->dte_next = event;
!             deferredTriggers->deftrig_event_tail = event;
!             }
!         else
!             {
!             deferredTriggerWriteEvent(event);
!             pfree(event);
!             }
!         }
!     }
  }


***************
*** 1869,1882 ****
       * the new states.  See DeferredTriggerSetState.)
       */

-     /* Make a per-tuple memory context for trigger function calls */
-     per_tuple_context =
-         AllocSetContextCreate(CurrentMemoryContext,
-                               "DeferredTriggerTupleContext",
-                               ALLOCSET_DEFAULT_MINSIZE,
-                               ALLOCSET_DEFAULT_INITSIZE,
-                               ALLOCSET_DEFAULT_MAXSIZE);
-
      /*
       * If immediate_only is true, then the only events that could need firing
       * are those since deftrig_events_imm.  (But if deftrig_events_imm is
--- 2301,2306 ----
***************
*** 1885,1903 ****
      if (immediate_only && deferredTriggers->deftrig_events_imm != NULL)
      {
          prev_event = deferredTriggers->deftrig_events_imm;
!         event = prev_event->dte_next;
      }
      else
      {
          prev_event = NULL;
          event = deferredTriggers->deftrig_events;
      }

      while (event != NULL)
      {
          bool        still_deferred_ones = false;
          DeferredTriggerEvent next_event;
          int            i;

          /*
           * Check if event is already completely done.
--- 2309,2359 ----
      if (immediate_only && deferredTriggers->deftrig_events_imm != NULL)
      {
          prev_event = deferredTriggers->deftrig_events_imm;
!         if (prev_event->dte_event!=TRIGGER_DEFERRED_ON_DISK)
!             {
!             if (event->dte_event==TRIGGER_DEFERRED_ON_DISK)
!                 {
!                 prev_event=event;
!                 deferredTriggerDiskImmReset();
!                 }
!             else
!                 {
!                 event = prev_event->dte_next;
!                 if (deferredTriggers->num_files>0)
!                     deferredTriggerDiskImmReset();
!                 }
!             }
      }
      else
      {
          prev_event = NULL;
          event = deferredTriggers->deftrig_events;
+         if (event)
+             if (event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+                 {
+                 prev_event=event;
+                 deferredTriggerDiskImmReset();
+                 }
+         if (deferredTriggers->num_files>0)
+             deferredTriggerDiskImmReset();
      }
+     /* Make a per-tuple memory context for trigger function calls */
+     per_tuple_context =
+         AllocSetContextCreate(CurrentMemoryContext,
+                               "DeferredTriggerTupleContext",
+                               ALLOCSET_DEFAULT_MINSIZE,
+                               ALLOCSET_DEFAULT_INITSIZE,
+                               ALLOCSET_DEFAULT_MAXSIZE);

+     if (prev_event)
+         if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+             event=deferredTriggerDiskImmReadNextEvent();
      while (event != NULL)
      {
          bool        still_deferred_ones = false;
          DeferredTriggerEvent next_event;
          int            i;
+         int         any_dn;

          /*
           * Check if event is already completely done.
***************
*** 1910,1915 ****
--- 2366,2372 ----
              /*
               * Check each trigger item in the event.
               */
+             any_dn=0;
              for (i = 0; i < event->dte_n_items; i++)
              {
                  if (event->dte_item[i].dti_state & TRIGGER_DEFERRED_DONE)
***************
*** 1966,1972 ****
--- 2423,2433 ----
                                         per_tuple_context);

                  event->dte_item[i].dti_state |= TRIGGER_DEFERRED_DONE;
+                 any_dn=1;
              }                    /* end loop over items within event */
+             if ((prev_event)&&(any_dn))
+                 if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+                     deferredTriggerDiskImmWriteDoneEvent(event);
          }

          /*
***************
*** 1977,1987 ****
           * we have to be careful about maintaining list validity here.
           */
          next_event = event->dte_next;

          if (still_deferred_ones)
          {
              /* Not done, keep in list */
!             prev_event = event;
          }
          else
          {
--- 2438,2455 ----
           * we have to be careful about maintaining list validity here.
           */
          next_event = event->dte_next;
+         if (prev_event)
+             if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+                 next_event = deferredTriggerDiskImmReadNextEvent();

          if (still_deferred_ones)
          {
              /* Not done, keep in list */
!             if (prev_event==NULL)
!                 prev_event=event;
!             else
!                 if (prev_event->dte_event!=TRIGGER_DEFERRED_ON_DISK)
!                     prev_event = event;
          }
          else
          {
***************
*** 1989,1999 ****
              if (immediate_only)
              {
                  /* delink it from list and free it */
!                 if (prev_event)
                      prev_event->dte_next = next_event;
                  else
                      deferredTriggers->deftrig_events = next_event;
!                 pfree(event);
              }
              else
              {
--- 2457,2468 ----
              if (immediate_only)
              {
                  /* delink it from list and free it */
!                 if (prev_event)
                      prev_event->dte_next = next_event;
                  else
                      deferredTriggers->deftrig_events = next_event;
!                 pfree(event);
!                 event=NULL;
              }
              else
              {
***************
*** 2002,2011 ****
                   * mark the event done.
                   */
                  event->dte_event |= TRIGGER_DEFERRED_DONE;
              }
          }
!
          event = next_event;
      }

      /* Update list tail pointer in case we just deleted tail event */
--- 2471,2492 ----
                   * mark the event done.
                   */
                  event->dte_event |= TRIGGER_DEFERRED_DONE;
+                 if (prev_event)
+                     if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+                         deferredTriggerDiskImmWriteDoneEvent(event);
              }
          }
!         if ((prev_event)&&(event))
!             if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
!                 pfree(event);
!
          event = next_event;
+         if (event!=NULL)
+             if (event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+                 {
+                 prev_event=event;
+                 event = deferredTriggerDiskImmReadNextEvent();
+                 }
      }

      /* Update list tail pointer in case we just deleted tail event */
***************
*** 2057,2062 ****
--- 2538,2547 ----
      MemoryContext cxt = TopTransactionContext;
      deferredTriggers = (DeferredTriggers) MemoryContextAlloc(TopTransactionContext,
              sizeof(DeferredTriggersData));
+     deferredTriggers->filename = (char *)
+      MemoryContextAlloc(TopTransactionContext, strlen(DatabasePath)+128);
+     deferredTriggers->imm_filename = (char *)
+      MemoryContextAlloc(TopTransactionContext, strlen(DatabasePath)+128);

      /*
       * Create the per transaction memory context
***************
*** 2076,2081 ****
--- 2561,2574 ----
      deferredTriggers->deftrig_events = NULL;
      deferredTriggers->deftrig_events_imm = NULL;
      deferredTriggers->deftrig_event_tail = NULL;
+     deferredTriggers->num_files=0;
+     deferredTriggers->bytes_written=NULL;
+     deferredTriggers->file_handle=NULL;
+     deferredTriggers->count=0;
+     deferredTriggers->imm_file_handle=NULL;
+     deferredTriggers->imm_filepos=0;
+     deferredTriggers->imm_filenum=0;
+     deferredTriggers->imm_szrd=0;
  }


***************
*** 2094,2100 ****
       */
      if (deferredTriggers == NULL)
          return;
-
      deferredTriggerInvokeEvents(true);
  }

--- 2587,2592 ----
***************
*** 2115,2122 ****
      if (deferredTriggers == NULL)
          return;

!     deferredTriggerInvokeEvents(false);
!
      deferredTriggers = NULL;
  }

--- 2607,2619 ----
      if (deferredTriggers == NULL)
          return;

!
!     if (deferredTriggers->num_files>0)
!         deferredTriggerDiskImmReset();
!
!     deferredTriggerInvokeEvents(true);
!     deferredTriggerDiskImmClean();
!     deferredTriggerClean();
      deferredTriggers = NULL;
  }

***************
*** 2141,2146 ****
--- 2638,2645 ----
      /*
       * Forget everything we know about deferred triggers.
       */
+     deferredTriggerDiskImmClean();
+     deferredTriggerClean();
      deferredTriggers = NULL;
  }

*** trigger.h.orig    Wed Jun 25 19:40:16 2003
--- trigger.h    Thu Jun 26 10:06:56 2003
***************
*** 16,21 ****
--- 16,22 ----
  #include "nodes/execnodes.h"
  #include "nodes/parsenodes.h"

+ #define max_deferred_trigger_mem_queue  100000
  /*
   * TriggerData is the node type that is passed as fmgr "context" info
   * when a function is called by the trigger manager.
***************
*** 50,55 ****
--- 51,57 ----
  #define TRIGGER_DEFERRED_DEFERRABLE        0x00000040
  #define TRIGGER_DEFERRED_INITDEFERRED    0x00000080
  #define TRIGGER_DEFERRED_HAS_BEFORE        0x00000100
+ #define TRIGGER_DEFERRED_ON_DISK        0x00000200

  #define TRIGGER_FIRED_BY_INSERT(event)    \
          (((TriggerEvent) (event) & TRIGGER_EVENT_OPMASK) == \
DROP TABLE trig_tst1;
CREATE TABLE trig_tst1(
    pk    serial,
    vli    int4,
    vlt    text
    );

DROP TABLE trig_tst2;
CREATE TABLE trig_tst2(
    opr    int4,
    npk    int4,
    opk    int4,
    vli    int4,
    vlt    text
    );

CREATE OR REPLACE FUNCTION tf_ins() RETURNS TRIGGER AS '
BEGIN
    INSERT INTO trig_tst2(opr,npk,vli,vlt) VALUES (1,NEW.pk,NEW.vli,NEW.vlt);
    RETURN NEW;
END;
' LANGUAGE 'plpgsql' VOLATILE;

CREATE OR REPLACE FUNCTION tf_updt() RETURNS TRIGGER AS '
BEGIN
    INSERT INTO trig_tst2(opr,npk,opk,vli,vlt) VALUES (2,NEW.pk,OLD.pk,NEW.vli,NEW.vlt);
    RETURN NEW;
END;
' LANGUAGE 'plpgsql' VOLATILE;

CREATE OR REPLACE FUNCTION tf_DEL() RETURNS TRIGGER AS '
BEGIN
    INSERT INTO trig_tst2(opr,opk) VALUES (3,OLD.pk);
    RETURN NEW;
END;
' LANGUAGE 'plpgsql' VOLATILE;

INSERT INTO trig_tst1 (vli,vlt) VALUES (1,'tst');
INSERT INTO trig_tst1 (vli,vlt) VALUES (2,'abc');
INSERT INTO trig_tst1 (vli,vlt) VALUES (3,'xyz');
INSERT INTO trig_tst1 (vli,vlt) VALUES (0,'bob');
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+4,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+8,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+16,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+32,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+64,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+128,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+256,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+512,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+1024,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+2048,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+4096,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+8192,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+16384,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+32768,vlt FROM trig_tst1;

CREATE TRIGGER tst_tf_ins AFTER INSERT ON trig_tst1 FOR EACH ROW EXECUTE PROCEDURE tf_ins();
CREATE TRIGGER tst_tf_updt AFTER UPDATE ON trig_tst1 FOR EACH ROW EXECUTE PROCEDURE tf_updt();
CREATE TRIGGER tst_tf_del AFTER DELETE ON trig_tst1 FOR EACH ROW EXECUTE PROCEDURE tf_del();

INSERT INTO trig_tst1 (vli,vlt) SELECT vli+65536,vlt FROM trig_tst1;

SELECT count(*) FROM trig_tst2;

INSERT INTO trig_tst1 (vli,vlt) SELECT vli+131072,vlt FROM trig_tst1;

SELECT count(*) FROM trig_tst2;

TRUNCATE trig_tst2;

UPDATE trig_tst1 SET vli=vli*2;

TRUNCATE trig_tst2;



pgsql-patches by date:

Previous
From: Peter Eisentraut
Date:
Subject: Re: array support patch phase 1 patch
Next
From: Devrim GUNDUZ
Date:
Subject: .pot files are unavailable (?)