Re: [HACKERS] COPYable logs status - Mailing list pgsql-patches
From | Andrew Dunstan |
---|---|
Subject | Re: [HACKERS] COPYable logs status |
Date | |
Msg-id | 466F2E8D.4020808@dunslane.net Whole thread Raw |
Responses |
Re: [HACKERS] COPYable logs status
|
List | pgsql-patches |
Tom Lane wrote: > Andrew Dunstan <andrew@dunslane.net> writes: > >> I'll try to get a patch out for just the stderr case, which should be >> back-patchable, then adjust the CSVlog patch to use it. >> > > Sounds like a plan. > > >> I'm thinking of handling the partial lines with a small dynahash of >> StringInfo buffers, which get discarded whenever we don't have a partial >> line for the PID. >> > > A hashtable might be overkill --- based on reports so far, it's unlikely > you'd have more than two or three messages being received concurrently, > so a simple list or array might be quicker to search. > > > Attached is a WIP patch ... I still have some debugging to do but I think the basic logic is there. Comments welcome. ATM it gets stuck in running installcheck and gdb shows the logger hanging here: enlargeStringInfo (str=0x9a91c8, needed=4085) at stringinfo.c:263 263 newlen = 2 * newlen; Can I not use a StringInfo in the syslogger? cheers andrew Index: src/backend/postmaster/syslogger.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/postmaster/syslogger.c,v retrieving revision 1.31 diff -c -r1.31 syslogger.c *** src/backend/postmaster/syslogger.c 4 Jun 2007 22:21:42 -0000 1.31 --- src/backend/postmaster/syslogger.c 12 Jun 2007 23:23:38 -0000 *************** *** 42,47 **** --- 42,48 ---- #include "utils/guc.h" #include "utils/ps_status.h" #include "utils/timestamp.h" + #include "lib/stringinfo.h" /* * We really want line-buffered mode for logfile output, but Windows does *************** *** 54,59 **** --- 55,76 ---- #define LBF_MODE _IOLBF #endif + #if PIPE_BUF > 1024 + #define READ_SIZE PIPE_BUF + #else + #define READ_SIZE 1024 + #endif + + /* + * we use a buffer twice as big as a read so that if there is a fragment left + * after process what is read we can save it and copy it back before the next + * read. + */ + #define READ_BUF_SIZE 2 * READ_SIZE + + /* buffer to keep any partial chunks read between calls to read()/ReadFile() */ + static char * read_fragment[READ_SIZE]; + static int read_fragment_len = 0; /* * GUC parameters. Redirect_stderr cannot be changed after postmaster *************** *** 75,89 **** * Private state */ static pg_time_t next_rotation_time; - static bool redirection_done = false; - static bool pipe_eof_seen = false; - static FILE *syslogFile = NULL; - static char *last_file_name = NULL; /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 int syslogPipe[2] = {-1, -1}; --- 92,110 ---- * Private state */ static pg_time_t next_rotation_time; static bool redirection_done = false; static bool pipe_eof_seen = false; static FILE *syslogFile = NULL; static char *last_file_name = NULL; + typedef struct + { + pid_t pid; + StringInfoData data; + } save_buffer; + #define CHUNK_SLOTS 20 + static save_buffer saved_chunks[CHUNK_SLOTS]; + /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 int syslogPipe[2] = {-1, -1}; *************** *** 117,123 **** static void set_next_rotation_time(void); static void sigHupHandler(SIGNAL_ARGS); static void sigUsr1Handler(SIGNAL_ARGS); ! /* * Main entry point for syslogger process --- 138,144 ---- static void set_next_rotation_time(void); static void sigHupHandler(SIGNAL_ARGS); static void sigUsr1Handler(SIGNAL_ARGS); ! static void write_chunk(const char * buffer, int count); /* * Main entry point for syslogger process *************** *** 244,250 **** bool time_based_rotation = false; #ifndef WIN32 ! char logbuffer[1024]; int bytesRead; int rc; fd_set rfds; --- 265,271 ---- bool time_based_rotation = false; #ifndef WIN32 ! char logbuffer[READ_BUF_SIZE]; int bytesRead; int rc; fd_set rfds; *************** *** 325,332 **** } else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds)) { bytesRead = piperead(syslogPipe[0], ! logbuffer, sizeof(logbuffer)); if (bytesRead < 0) { --- 346,354 ---- } else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds)) { + memcpy(logbuffer, read_fragment, read_fragment_len); bytesRead = piperead(syslogPipe[0], ! logbuffer + read_fragment_len, READ_SIZE); if (bytesRead < 0) { *************** *** 337,343 **** } else if (bytesRead > 0) { ! write_syslogger_file(logbuffer, bytesRead); continue; } else --- 359,365 ---- } else if (bytesRead > 0) { ! write_syslogger_file(logbuffer, bytesRead + read_fragment_len); continue; } else *************** *** 349,354 **** --- 371,380 ---- * and all backends are shut down, and we are done. */ pipe_eof_seen = true; + + /* if there's a fragment left then force it out now */ + if (read_fragment_len) + write_chunk(read_fragment, read_fragment_len); } } #else /* WIN32 */ *************** *** 626,631 **** --- 652,785 ---- void write_syslogger_file(const char *buffer, int count) { + char *cursor = (char *) buffer; + int chunklen; + PipeProto p; + while (count > 0) + { + /* not enough data even for a header? save it until we get more */ + if (count < sizeof(PipeProto)) + { + memcpy(read_fragment, cursor, count); + read_fragment_len = count; + return; + } + /* process protocol chunks */ + if ( cursor[0] == '\0' && cursor[1] == '\0' ) + { + memcpy(&p,cursor,sizeof(PipeProto)); + /* save a partial chunk in the fragment buffer */ + if (p.len + PIPE_DATA_OFFSET > count) + { + memcpy(read_fragment, cursor, count); + read_fragment_len = count; + return; + } + /* + * save a complete non-final chunk in the poer-pid buffer + * if possible - if not just write it out. + */ + else if ( ! p.is_last ) + { + int free_slot = -1, existing_slot = -1; + int i; + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == 0 && free_slot < 0) + free_slot = i; + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + } + if (existing_slot > -1) + { + appendBinaryStringInfo(&saved_chunks[existing_slot].data, + cursor + PIPE_DATA_OFFSET, p.len); + } + else if (free_slot > -1) + { + saved_chunks[free_slot].pid = p.pid; + initStringInfo(&saved_chunks[free_slot].data); + appendBinaryStringInfo(&saved_chunks[existing_slot].data, + cursor + PIPE_DATA_OFFSET, p.len); + } + else + { + /* + * if there is no exisiting or free slot we'll just have to + * take our chances and write out a part message and hope + * that it's not followed by something from another pid. + */ + write_chunk(cursor + PIPE_DATA_OFFSET, p.len); + } + count -= PIPE_DATA_OFFSET + p.len; + cursor += PIPE_DATA_OFFSET + p.len; + } + /* + * add a final chunk to anything saved for that pid, and either way + * write the whole thing out. + */ + else + { + int existing_slot = -1; + int i; + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + } + if (existing_slot > -1) + { + appendBinaryStringInfo(&saved_chunks[existing_slot].data, + cursor + PIPE_DATA_OFFSET, p.len); + write_chunk(saved_chunks[existing_slot].data.data, + saved_chunks[existing_slot].data.len); + saved_chunks[existing_slot].pid = 0; + pfree(saved_chunks[existing_slot].data.data); + } + else + { + /* the whole message was one chunk, probably. */ + write_chunk(cursor + PIPE_DATA_OFFSET, p.len); + } + count -= PIPE_DATA_OFFSET + p.len; + cursor += PIPE_DATA_OFFSET + p.len; + } + + } + /* process non-protocol chunks */ + { + /* look for the start of a protocol header */ + for(chunklen = 1; chunklen + 1 < count; chunklen++) + { + if (cursor[chunklen] == '\0' && cursor[chunklen + 1] == '\0') + { + write_chunk(cursor, chunklen); + cursor += chunklen; + count -= chunklen; + break; + } + } + /* if no protocol header, write out the whole remaining buffer */ + if (chunklen + 1 >= count) + { + write_chunk(cursor, count); + read_fragment_len = 0; + return; + } + } + } + + } + + void + write_chunk(const char *buffer, int count) + { int rc; #ifndef WIN32 *************** *** 654,664 **** pipeThread(void *arg) { DWORD bytesRead; ! char logbuffer[1024]; for (;;) { ! if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer), &bytesRead, 0)) { DWORD error = GetLastError(); --- 808,819 ---- pipeThread(void *arg) { DWORD bytesRead; ! char logbuffer[READ_BUF_SIZE]; for (;;) { ! memcpy(logbuffer, read_buffer, read_fragment_len); ! if (!ReadFile(syslogPipe[0], logbuffer + read_fragment_len, READ_SIZE, &bytesRead, 0)) { DWORD error = GetLastError(); *************** *** 672,682 **** errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) ! write_syslogger_file(logbuffer, bytesRead); } /* We exit the above loop only upon detecting pipe EOF */ pipe_eof_seen = true; _endthread(); return 0; } --- 827,842 ---- errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) ! write_syslogger_file(logbuffer, bytesRead + read_fragment_len); } /* We exit the above loop only upon detecting pipe EOF */ pipe_eof_seen = true; + + /* if there's a fragment left then force it out now */ + if (read_fragment_len) + write_chunk(read_fragment, read_fragment_len); + _endthread(); return 0; } Index: src/backend/utils/error/elog.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/utils/error/elog.c,v retrieving revision 1.186 diff -c -r1.186 elog.c *** src/backend/utils/error/elog.c 7 Jun 2007 21:45:59 -0000 1.186 --- src/backend/utils/error/elog.c 12 Jun 2007 23:23:43 -0000 *************** *** 56,61 **** --- 56,62 ---- #ifdef HAVE_SYSLOG #include <syslog.h> #endif + #include <limits.h> #include "access/transam.h" #include "access/xact.h" *************** *** 71,76 **** --- 72,78 ---- #include "utils/ps_status.h" + /* Global variables */ ErrorContextCallback *error_context_stack = NULL; *************** *** 124,129 **** --- 126,135 ---- static const char *error_severity(int elevel); static void append_with_tabs(StringInfo buf, const char *str); static bool is_log_level_output(int elevel, int log_min_level); + static void write_pipe_chunks(int fd, char * data, int len); + + /* allow space for preamble plus a little head room */ + #define MAX_CHUNK (sizeof(PipeChunk) - sizeof(PipeProto)) /* *************** *** 1783,1789 **** write_eventlog(edata->elevel, buf.data); else #endif ! fprintf(stderr, "%s", buf.data); } /* If in the syslogger process, try to write messages direct to file */ --- 1789,1798 ---- write_eventlog(edata->elevel, buf.data); else #endif ! if (Redirect_stderr) ! write_pipe_chunks(fileno(stderr),buf.data, buf.len); ! else ! write(fileno(stderr), buf.data, buf.len); } /* If in the syslogger process, try to write messages direct to file */ *************** *** 1794,1799 **** --- 1803,1838 ---- } + static void + write_pipe_chunks(int fd, char * data, int len) + { + PipeChunk p; + + Assert(len > 0); + + p.proto.nuls[0] = p.proto.nuls[1] = '\0'; + p.proto.pid = MyProcPid; + p.proto.is_last = false; + p.proto.len = MAX_CHUNK; + + write_stderr("total len is %d\n",len); + + /* write all but the last chunk */ + while (len > MAX_CHUNK) + { + memcpy(p.proto.data, data, MAX_CHUNK); + write(fd, &p, PIPE_DATA_OFFSET + MAX_CHUNK ); + data += MAX_CHUNK; + len -= MAX_CHUNK; + } + + /* write the last chunk */ + p.proto.is_last = true; + p.proto.len = len; + memcpy(p.proto.data, data, len); + write(fd, &p, PIPE_DATA_OFFSET + len); + } + /* * Write error report to client */ Index: src/include/postmaster/syslogger.h =================================================================== RCS file: /cvsroot/pgsql/src/include/postmaster/syslogger.h,v retrieving revision 1.8 diff -c -r1.8 syslogger.h *** src/include/postmaster/syslogger.h 5 Jan 2007 22:19:57 -0000 1.8 --- src/include/postmaster/syslogger.h 12 Jun 2007 23:23:48 -0000 *************** *** 37,40 **** --- 37,58 ---- extern void SysLoggerMain(int argc, char *argv[]); #endif + /* primitive protocol structure for writing to syslogger pipe(s) */ + typedef struct + { + char nuls[2]; /* always \0\0 */ + uint16 len; /* size of this chunk */ + pid_t pid; /* our pid */ + bool is_last; /* is this the last chunk? */ + char data[1]; + } PipeProto; + + typedef union + { + PipeProto proto; + char data[PIPE_BUF]; + } PipeChunk; + + #define PIPE_DATA_OFFSET offsetof(PipeProto, data) /* 9 usually */ + #endif /* _SYSLOGGER_H */
pgsql-patches by date: