Re: [HACKERS] COPYable logs status - Mailing list pgsql-patches

From Andrew Dunstan
Subject Re: [HACKERS] COPYable logs status
Date
Msg-id 466F2E8D.4020808@dunslane.net
Whole thread Raw
Responses Re: [HACKERS] COPYable logs status
List pgsql-patches

Tom Lane wrote:
> Andrew Dunstan <andrew@dunslane.net> writes:
>
>> I'll try to get a patch out for just the stderr case, which should be
>> back-patchable, then adjust the CSVlog patch to use it.
>>
>
> Sounds like a plan.
>
>
>> I'm thinking of handling the partial lines with a small dynahash of
>> StringInfo buffers, which get discarded whenever we don't have a partial
>> line for the PID.
>>
>
> A hashtable might be overkill --- based on reports so far, it's unlikely
> you'd have more than two or three messages being received concurrently,
> so a simple list or array might be quicker to search.
>
>
>



Attached is a WIP patch ... I still have some debugging to do but I
think the basic logic is there. Comments welcome.

ATM it gets stuck in running installcheck and gdb shows the logger
hanging here:

enlargeStringInfo (str=0x9a91c8, needed=4085) at stringinfo.c:263
263                     newlen = 2 * newlen;


Can I not use a StringInfo in the syslogger?

cheers

andrew
Index: src/backend/postmaster/syslogger.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/syslogger.c,v
retrieving revision 1.31
diff -c -r1.31 syslogger.c
*** src/backend/postmaster/syslogger.c    4 Jun 2007 22:21:42 -0000    1.31
--- src/backend/postmaster/syslogger.c    12 Jun 2007 23:23:38 -0000
***************
*** 42,47 ****
--- 42,48 ----
  #include "utils/guc.h"
  #include "utils/ps_status.h"
  #include "utils/timestamp.h"
+ #include "lib/stringinfo.h"

  /*
   * We really want line-buffered mode for logfile output, but Windows does
***************
*** 54,59 ****
--- 55,76 ----
  #define LBF_MODE    _IOLBF
  #endif

+ #if PIPE_BUF > 1024
+ #define READ_SIZE PIPE_BUF
+ #else
+ #define READ_SIZE 1024
+ #endif
+
+ /*
+  * we use a buffer twice as big as a read so that if there is a fragment left
+  * after process what is read we can save it and copy it back before the next
+  * read.
+  */
+ #define READ_BUF_SIZE 2 * READ_SIZE
+
+ /* buffer to keep any partial chunks read between calls to read()/ReadFile() */
+ static char * read_fragment[READ_SIZE];
+ static int read_fragment_len = 0;

  /*
   * GUC parameters.    Redirect_stderr cannot be changed after postmaster
***************
*** 75,89 ****
   * Private state
   */
  static pg_time_t next_rotation_time;
-
  static bool redirection_done = false;
-
  static bool pipe_eof_seen = false;
-
  static FILE *syslogFile = NULL;
-
  static char *last_file_name = NULL;

  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
  int            syslogPipe[2] = {-1, -1};
--- 92,110 ----
   * Private state
   */
  static pg_time_t next_rotation_time;
  static bool redirection_done = false;
  static bool pipe_eof_seen = false;
  static FILE *syslogFile = NULL;
  static char *last_file_name = NULL;

+ typedef struct
+ {
+     pid_t pid;
+     StringInfoData data;
+ } save_buffer;
+ #define CHUNK_SLOTS 20
+ static save_buffer saved_chunks[CHUNK_SLOTS];
+
  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
  int            syslogPipe[2] = {-1, -1};
***************
*** 117,123 ****
  static void set_next_rotation_time(void);
  static void sigHupHandler(SIGNAL_ARGS);
  static void sigUsr1Handler(SIGNAL_ARGS);
!

  /*
   * Main entry point for syslogger process
--- 138,144 ----
  static void set_next_rotation_time(void);
  static void sigHupHandler(SIGNAL_ARGS);
  static void sigUsr1Handler(SIGNAL_ARGS);
! static void write_chunk(const char * buffer, int count);

  /*
   * Main entry point for syslogger process
***************
*** 244,250 ****
          bool        time_based_rotation = false;

  #ifndef WIN32
!         char        logbuffer[1024];
          int            bytesRead;
          int            rc;
          fd_set        rfds;
--- 265,271 ----
          bool        time_based_rotation = false;

  #ifndef WIN32
!         char        logbuffer[READ_BUF_SIZE];
          int            bytesRead;
          int            rc;
          fd_set        rfds;
***************
*** 325,332 ****
          }
          else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
          {
              bytesRead = piperead(syslogPipe[0],
!                                  logbuffer, sizeof(logbuffer));

              if (bytesRead < 0)
              {
--- 346,354 ----
          }
          else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
          {
+             memcpy(logbuffer, read_fragment, read_fragment_len);
              bytesRead = piperead(syslogPipe[0],
!                                  logbuffer + read_fragment_len, READ_SIZE);

              if (bytesRead < 0)
              {
***************
*** 337,343 ****
              }
              else if (bytesRead > 0)
              {
!                 write_syslogger_file(logbuffer, bytesRead);
                  continue;
              }
              else
--- 359,365 ----
              }
              else if (bytesRead > 0)
              {
!                 write_syslogger_file(logbuffer, bytesRead + read_fragment_len);
                  continue;
              }
              else
***************
*** 349,354 ****
--- 371,380 ----
                   * and all backends are shut down, and we are done.
                   */
                  pipe_eof_seen = true;
+
+                 /* if there's a fragment left then force it out now */
+                 if (read_fragment_len)
+                     write_chunk(read_fragment, read_fragment_len);
              }
          }
  #else                            /* WIN32 */
***************
*** 626,631 ****
--- 652,785 ----
  void
  write_syslogger_file(const char *buffer, int count)
  {
+     char *cursor = (char *) buffer;
+     int  chunklen;
+     PipeProto p;
+     while (count > 0)
+     {
+         /* not enough data even for a header? save it until we get more */
+         if (count < sizeof(PipeProto))
+         {
+             memcpy(read_fragment, cursor, count);
+             read_fragment_len = count;
+             return;
+         }
+         /* process protocol chunks */
+         if ( cursor[0] == '\0' && cursor[1] == '\0' )
+         {
+             memcpy(&p,cursor,sizeof(PipeProto));
+             /* save a partial chunk in the fragment buffer */
+             if (p.len + PIPE_DATA_OFFSET > count)
+             {
+                 memcpy(read_fragment, cursor, count);
+                 read_fragment_len = count;
+                 return;
+             }
+             /*
+              * save a complete non-final chunk in the poer-pid buffer
+              * if possible - if not just write it out.
+              */
+             else if ( ! p.is_last )
+             {
+                 int free_slot = -1, existing_slot = -1;
+                 int i;
+                 for (i = 0; i < CHUNK_SLOTS; i++)
+                 {
+                     if (saved_chunks[i].pid == 0 && free_slot < 0)
+                         free_slot = i;
+                     if (saved_chunks[i].pid == p.pid)
+                     {
+                         existing_slot = i;
+                         break;
+                     }
+                 }
+                 if (existing_slot > -1)
+                 {
+                     appendBinaryStringInfo(&saved_chunks[existing_slot].data,
+                                            cursor + PIPE_DATA_OFFSET, p.len);
+                 }
+                 else if (free_slot > -1)
+                 {
+                     saved_chunks[free_slot].pid = p.pid;
+                     initStringInfo(&saved_chunks[free_slot].data);
+                     appendBinaryStringInfo(&saved_chunks[existing_slot].data,
+                                            cursor + PIPE_DATA_OFFSET, p.len);
+                 }
+                 else
+                 {
+                     /*
+                      * if there is no exisiting or free slot we'll just have to
+                      * take our chances and write out a part message and hope
+                      * that it's not followed by something from another pid.
+                      */
+                     write_chunk(cursor + PIPE_DATA_OFFSET, p.len);
+                 }
+                 count -= PIPE_DATA_OFFSET + p.len;
+                 cursor += PIPE_DATA_OFFSET + p.len;
+             }
+             /*
+              * add a final chunk to anything saved for that pid, and either way
+              * write the whole thing out.
+              */
+             else
+             {
+                 int existing_slot = -1;
+                 int i;
+                 for (i = 0; i < CHUNK_SLOTS; i++)
+                 {
+                     if (saved_chunks[i].pid == p.pid)
+                     {
+                         existing_slot = i;
+                         break;
+                     }
+                 }
+                 if (existing_slot > -1)
+                 {
+                     appendBinaryStringInfo(&saved_chunks[existing_slot].data,
+                                            cursor + PIPE_DATA_OFFSET, p.len);
+                     write_chunk(saved_chunks[existing_slot].data.data,
+                                 saved_chunks[existing_slot].data.len);
+                     saved_chunks[existing_slot].pid = 0;
+                     pfree(saved_chunks[existing_slot].data.data);
+                 }
+                 else
+                 {
+                     /* the whole message was one chunk, probably. */
+                     write_chunk(cursor + PIPE_DATA_OFFSET, p.len);
+                 }
+                 count -= PIPE_DATA_OFFSET + p.len;
+                 cursor += PIPE_DATA_OFFSET + p.len;
+             }
+
+         }
+         /* process non-protocol chunks */
+         {
+             /* look for the start of a protocol header */
+             for(chunklen = 1; chunklen + 1 < count; chunklen++)
+             {
+                 if (cursor[chunklen] == '\0' && cursor[chunklen + 1] == '\0')
+                 {
+                     write_chunk(cursor, chunklen);
+                     cursor += chunklen;
+                     count -= chunklen;
+                     break;
+                 }
+             }
+             /* if no protocol header, write out the whole remaining buffer */
+             if (chunklen + 1 >= count)
+             {
+                 write_chunk(cursor, count);
+                 read_fragment_len = 0;
+                 return;
+             }
+         }
+     }
+
+ }
+
+ void
+ write_chunk(const char *buffer, int count)
+ {
      int            rc;

  #ifndef WIN32
***************
*** 654,664 ****
  pipeThread(void *arg)
  {
      DWORD        bytesRead;
!     char        logbuffer[1024];

      for (;;)
      {
!         if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
                        &bytesRead, 0))
          {
              DWORD        error = GetLastError();
--- 808,819 ----
  pipeThread(void *arg)
  {
      DWORD        bytesRead;
!     char        logbuffer[READ_BUF_SIZE];

      for (;;)
      {
!         memcpy(logbuffer, read_buffer, read_fragment_len);
!         if (!ReadFile(syslogPipe[0], logbuffer + read_fragment_len, READ_SIZE,
                        &bytesRead, 0))
          {
              DWORD        error = GetLastError();
***************
*** 672,682 ****
                       errmsg("could not read from logger pipe: %m")));
          }
          else if (bytesRead > 0)
!             write_syslogger_file(logbuffer, bytesRead);
      }

      /* We exit the above loop only upon detecting pipe EOF */
      pipe_eof_seen = true;
      _endthread();
      return 0;
  }
--- 827,842 ----
                       errmsg("could not read from logger pipe: %m")));
          }
          else if (bytesRead > 0)
!             write_syslogger_file(logbuffer, bytesRead + read_fragment_len);
      }

      /* We exit the above loop only upon detecting pipe EOF */
      pipe_eof_seen = true;
+
+     /* if there's a fragment left then force it out now */
+     if (read_fragment_len)
+         write_chunk(read_fragment, read_fragment_len);
+
      _endthread();
      return 0;
  }
Index: src/backend/utils/error/elog.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/utils/error/elog.c,v
retrieving revision 1.186
diff -c -r1.186 elog.c
*** src/backend/utils/error/elog.c    7 Jun 2007 21:45:59 -0000    1.186
--- src/backend/utils/error/elog.c    12 Jun 2007 23:23:43 -0000
***************
*** 56,61 ****
--- 56,62 ----
  #ifdef HAVE_SYSLOG
  #include <syslog.h>
  #endif
+ #include <limits.h>

  #include "access/transam.h"
  #include "access/xact.h"
***************
*** 71,76 ****
--- 72,78 ----
  #include "utils/ps_status.h"


+
  /* Global variables */
  ErrorContextCallback *error_context_stack = NULL;

***************
*** 124,129 ****
--- 126,135 ----
  static const char *error_severity(int elevel);
  static void append_with_tabs(StringInfo buf, const char *str);
  static bool is_log_level_output(int elevel, int log_min_level);
+ static void write_pipe_chunks(int fd, char * data, int len);
+
+ /* allow space for preamble plus a little head room */
+ #define MAX_CHUNK (sizeof(PipeChunk) - sizeof(PipeProto))


  /*
***************
*** 1783,1789 ****
              write_eventlog(edata->elevel, buf.data);
          else
  #endif
!             fprintf(stderr, "%s", buf.data);
      }

      /* If in the syslogger process, try to write messages direct to file */
--- 1789,1798 ----
              write_eventlog(edata->elevel, buf.data);
          else
  #endif
!         if (Redirect_stderr)
!             write_pipe_chunks(fileno(stderr),buf.data, buf.len);
!         else
!             write(fileno(stderr), buf.data, buf.len);
      }

      /* If in the syslogger process, try to write messages direct to file */
***************
*** 1794,1799 ****
--- 1803,1838 ----
  }


+ static void
+ write_pipe_chunks(int fd, char * data, int len)
+ {
+     PipeChunk p;
+
+     Assert(len > 0);
+
+     p.proto.nuls[0] = p.proto.nuls[1] = '\0';
+     p.proto.pid = MyProcPid;
+     p.proto.is_last = false;
+     p.proto.len = MAX_CHUNK;
+
+     write_stderr("total len is %d\n",len);
+
+     /* write all but the last chunk */
+     while (len > MAX_CHUNK)
+     {
+         memcpy(p.proto.data, data, MAX_CHUNK);
+         write(fd, &p, PIPE_DATA_OFFSET + MAX_CHUNK );
+         data += MAX_CHUNK;
+         len -= MAX_CHUNK;
+     }
+
+     /* write the last chunk */
+     p.proto.is_last = true;
+     p.proto.len = len;
+     memcpy(p.proto.data, data, len);
+     write(fd, &p, PIPE_DATA_OFFSET + len);
+ }
+
  /*
   * Write error report to client
   */
Index: src/include/postmaster/syslogger.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/postmaster/syslogger.h,v
retrieving revision 1.8
diff -c -r1.8 syslogger.h
*** src/include/postmaster/syslogger.h    5 Jan 2007 22:19:57 -0000    1.8
--- src/include/postmaster/syslogger.h    12 Jun 2007 23:23:48 -0000
***************
*** 37,40 ****
--- 37,58 ----
  extern void SysLoggerMain(int argc, char *argv[]);
  #endif

+ /* primitive protocol structure for writing to syslogger pipe(s) */
+ typedef struct
+ {
+     char      nuls[2];    /* always \0\0 */
+     uint16    len;        /* size of this chunk */
+     pid_t     pid;        /* our pid */
+     bool      is_last;    /* is this the last chunk? */
+     char      data[1];
+ } PipeProto;
+
+ typedef union
+ {
+     PipeProto    proto;
+     char         data[PIPE_BUF];
+ }  PipeChunk;
+
+ #define PIPE_DATA_OFFSET offsetof(PipeProto, data) /* 9 usually */
+
  #endif   /* _SYSLOGGER_H */

pgsql-patches by date:

Previous
From: Tom Lane
Date:
Subject: Re: Two aesthetic bugs in the 1-byte packed varlena code
Next
From: Satoshi Nagayasu
Date:
Subject: Re: trace_checkpoint parameter patch