Re: pipe chunks protocol - Mailing list pgsql-patches
From | Andrew Dunstan |
---|---|
Subject | Re: pipe chunks protocol |
Date | |
Msg-id | 467012DF.6060108@dunslane.net Whole thread Raw |
In response to | pipe chunks protocol (Andrew Dunstan <andrew@dunslane.net>) |
Responses |
Re: pipe chunks protocol
|
List | pgsql-patches |
and here's the patch Andrew Dunstan wrote: > > This patch implements the protocol Tom suggested for writing to the > syslogger pipe. It seems to pass my tests (basically "make > installcheck" against a server with stderr redirection turned on and > log_statement set to 'all'). > > The effect of this should be to prevent two problems: > . partial messages get written to the log file, which messes with > rotation, and > . messages from various backends get interleaved, causing garbled logs. > > Please review ASAP. I want to get this applied soon so that a) it gets > wider testing and b) I can use it as the basis for the adapted CSV log > patch. If this is acceptable I intend to backpatch this all the way to > wherever we started using the syslogger pipe (was that 8.0?). > > cheers > > andrew > > Index: src/backend/postmaster/syslogger.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/postmaster/syslogger.c,v retrieving revision 1.31 diff -c -r1.31 syslogger.c *** src/backend/postmaster/syslogger.c 4 Jun 2007 22:21:42 -0000 1.31 --- src/backend/postmaster/syslogger.c 13 Jun 2007 15:38:07 -0000 *************** *** 42,47 **** --- 42,48 ---- #include "utils/guc.h" #include "utils/ps_status.h" #include "utils/timestamp.h" + #include "lib/stringinfo.h" /* * We really want line-buffered mode for logfile output, but Windows does *************** *** 54,59 **** --- 55,77 ---- #define LBF_MODE _IOLBF #endif + /* try not to break chunked messages into multiple reads */ + #if PIPE_BUF > 1024 + #define READ_SIZE PIPE_BUF + #else + #define READ_SIZE 1024 + #endif + + /* + * we use a buffer twice as big as a read so that if there is a fragment left + * after processing what is read we can save it and copy it back before the + * next read. + */ + #define READ_BUF_SIZE 2 * READ_SIZE + + /* buffer to keep any partial chunks read between calls to read()/ReadFile() */ + static char * read_fragment[READ_SIZE]; + static int read_fragment_len = 0; /* * GUC parameters. Redirect_stderr cannot be changed after postmaster *************** *** 75,89 **** * Private state */ static pg_time_t next_rotation_time; - static bool redirection_done = false; - static bool pipe_eof_seen = false; - static FILE *syslogFile = NULL; - static char *last_file_name = NULL; /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 int syslogPipe[2] = {-1, -1}; --- 93,117 ---- * Private state */ static pg_time_t next_rotation_time; static bool redirection_done = false; static bool pipe_eof_seen = false; static FILE *syslogFile = NULL; static char *last_file_name = NULL; + /* + * buffers for saving partial messages from different backends. We don't expect + * that there will be very many outstanding at one time, so 20 seems plenty of + * leeway. If this array gets full we won't lose messages, but we will lose + * the protocol protection against them being partially written or interleaved. + */ + typedef struct + { + pid_t pid; + StringInfoData data; + } save_buffer; + #define CHUNK_SLOTS 20 + static save_buffer saved_chunks[CHUNK_SLOTS]; + /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 int syslogPipe[2] = {-1, -1}; *************** *** 117,123 **** static void set_next_rotation_time(void); static void sigHupHandler(SIGNAL_ARGS); static void sigUsr1Handler(SIGNAL_ARGS); ! /* * Main entry point for syslogger process --- 145,151 ---- static void set_next_rotation_time(void); static void sigHupHandler(SIGNAL_ARGS); static void sigUsr1Handler(SIGNAL_ARGS); ! static void write_chunk(const char * buffer, int count); /* * Main entry point for syslogger process *************** *** 244,250 **** bool time_based_rotation = false; #ifndef WIN32 ! char logbuffer[1024]; int bytesRead; int rc; fd_set rfds; --- 272,278 ---- bool time_based_rotation = false; #ifndef WIN32 ! char logbuffer[READ_BUF_SIZE]; int bytesRead; int rc; fd_set rfds; *************** *** 325,332 **** } else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds)) { bytesRead = piperead(syslogPipe[0], ! logbuffer, sizeof(logbuffer)); if (bytesRead < 0) { --- 353,363 ---- } else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds)) { + Assert (read_fragment_len <= READ_SIZE); + + memcpy(logbuffer, read_fragment, read_fragment_len); bytesRead = piperead(syslogPipe[0], ! logbuffer + read_fragment_len, READ_SIZE); if (bytesRead < 0) { *************** *** 337,343 **** } else if (bytesRead > 0) { ! write_syslogger_file(logbuffer, bytesRead); continue; } else --- 368,374 ---- } else if (bytesRead > 0) { ! write_syslogger_file(logbuffer, bytesRead + read_fragment_len); continue; } else *************** *** 349,354 **** --- 380,389 ---- * and all backends are shut down, and we are done. */ pipe_eof_seen = true; + + /* if there's a fragment left then force it out now */ + if (read_fragment_len) + write_chunk(read_fragment, read_fragment_len); } } #else /* WIN32 */ *************** *** 622,631 **** --- 657,821 ---- * This is exported so that elog.c can call it when am_syslogger is true. * This allows the syslogger process to record elog messages of its own, * even though its stderr does not point at the syslog pipe. + * + * This routine processes the log pipe protocol which sends log messages as + * chunks - such chunks are detected and reassembled here. + * The protocol has a header that starts with two nul bytes, then has a 16 bit + * length, the pid of the sending process, and a flag to indicate if it is + * the last chunk in a message. Incomplete chunks are saved until we read some + * more, and non-final chunks are accumulated until we get the final chunk. + * + * All of this is to avoid 2 problems: + * . partial messages being written to logfiles, (messes rotation) and + * . messages from different backends being interleaved (messages garbled). + * + * Any non-protocol messages are written out directly. These should only come + * from non-PostgreSQL sources, however (e.g. third party libraries writing to + * stderr). This won't matter for CSV output, which will be a separate + * reporting channel. */ void write_syslogger_file(const char *buffer, int count) { + char *cursor = (char *) buffer; + int chunklen; + PipeProto p; + + /* the buffer has any fragment we had saved, so reset the length */ + read_fragment_len = 0; + + + while (count > 0) + { + /* not enough data even for a header? save it until we get more */ + if (count < sizeof(PipeProto)) + { + memcpy(read_fragment, cursor, count); + read_fragment_len = count; + return; + } + /* process protocol chunks */ + if ( cursor[0] == '\0' && cursor[1] == '\0' ) + { + memcpy(&p,cursor,sizeof(PipeProto)); + /* save a partial chunk in the fragment buffer */ + if (p.len + PIPE_DATA_OFFSET > count) + { + Assert(count <= READ_SIZE); + memcpy(read_fragment, cursor, count); + read_fragment_len = count; + return; + } + /* + * save a complete non-final chunk in the per-pid buffer + * if possible - if not just write it out. + */ + else if ( p.is_last != 't') + { + int free_slot = -1, existing_slot = -1; + int i; + StringInfo str; + + Assert (p.is_last == 'f'); + + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == 0 && free_slot < 0) + free_slot = i; + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + } + if (existing_slot > -1) + { + str = &(saved_chunks[existing_slot].data); + appendBinaryStringInfo(str, cursor + PIPE_DATA_OFFSET, + p.len); + } + else if (free_slot > -1) + { + saved_chunks[free_slot].pid = p.pid; + str = &(saved_chunks[free_slot].data); + initStringInfo(str); + appendBinaryStringInfo(str, cursor + PIPE_DATA_OFFSET, + p.len); + } + else + { + /* + * if there is no exisiting or free slot we'll just have to + * take our chances and write out a part message and hope + * that it's not followed by something from another pid. + */ + write_chunk(cursor + PIPE_DATA_OFFSET, p.len); + } + } + /* + * add a final chunk to anything saved for that pid, and either way + * write the whole thing out. + */ + else + { + int existing_slot = -1; + int i; + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + } + if (existing_slot > -1) + { + appendBinaryStringInfo(&(saved_chunks[existing_slot].data), + cursor + PIPE_DATA_OFFSET, p.len); + write_chunk(saved_chunks[existing_slot].data.data, + saved_chunks[existing_slot].data.len); + saved_chunks[existing_slot].pid = 0; + pfree(saved_chunks[existing_slot].data.data); + } + else + { + /* the whole message was one chunk, probably. */ + write_chunk(cursor + PIPE_DATA_OFFSET, p.len); + } + } + + count -= PIPE_DATA_OFFSET + p.len; + cursor += PIPE_DATA_OFFSET + p.len; + } + /* process non-protocol chunks */ + else + { + /* look for the start of a protocol header */ + for(chunklen = 1; chunklen + 1 < count; chunklen++) + { + if (cursor[chunklen] == '\0' && cursor[chunklen + 1] == '\0') + { + write_chunk(cursor, chunklen); + cursor += chunklen; + count -= chunklen; + break; + } + } + /* if no protocol header, write out the whole remaining buffer */ + if (chunklen + 1 >= count) + { + write_chunk(cursor, count); + read_fragment_len = 0; + return; + } + } + } + + } + + void + write_chunk(const char *buffer, int count) + { int rc; #ifndef WIN32 *************** *** 654,664 **** pipeThread(void *arg) { DWORD bytesRead; ! char logbuffer[1024]; for (;;) { ! if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer), &bytesRead, 0)) { DWORD error = GetLastError(); --- 844,856 ---- pipeThread(void *arg) { DWORD bytesRead; ! char logbuffer[READ_BUF_SIZE]; for (;;) { ! Assert (read_fragment_len <= READ_SIZE); ! memcpy(logbuffer, read_buffer, read_fragment_len); ! if (!ReadFile(syslogPipe[0], logbuffer + read_fragment_len, READ_SIZE, &bytesRead, 0)) { DWORD error = GetLastError(); *************** *** 672,682 **** errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) ! write_syslogger_file(logbuffer, bytesRead); } /* We exit the above loop only upon detecting pipe EOF */ pipe_eof_seen = true; _endthread(); return 0; } --- 864,879 ---- errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) ! write_syslogger_file(logbuffer, bytesRead + read_fragment_len); } /* We exit the above loop only upon detecting pipe EOF */ pipe_eof_seen = true; + + /* if there's a fragment left then force it out now */ + if (read_fragment_len) + write_chunk(read_fragment, read_fragment_len); + _endthread(); return 0; } Index: src/backend/utils/error/elog.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/utils/error/elog.c,v retrieving revision 1.186 diff -c -r1.186 elog.c *** src/backend/utils/error/elog.c 7 Jun 2007 21:45:59 -0000 1.186 --- src/backend/utils/error/elog.c 13 Jun 2007 15:38:08 -0000 *************** *** 56,61 **** --- 56,62 ---- #ifdef HAVE_SYSLOG #include <syslog.h> #endif + #include <limits.h> #include "access/transam.h" #include "access/xact.h" *************** *** 71,76 **** --- 72,78 ---- #include "utils/ps_status.h" + /* Global variables */ ErrorContextCallback *error_context_stack = NULL; *************** *** 124,129 **** --- 126,135 ---- static const char *error_severity(int elevel); static void append_with_tabs(StringInfo buf, const char *str); static bool is_log_level_output(int elevel, int log_min_level); + static void write_pipe_chunks(int fd, char * data, int len); + + /* allow space for preamble plus a little head room */ + #define MAX_CHUNK (sizeof(PipeChunk) - sizeof(PipeProto)) /* *************** *** 1783,1789 **** write_eventlog(edata->elevel, buf.data); else #endif ! fprintf(stderr, "%s", buf.data); } /* If in the syslogger process, try to write messages direct to file */ --- 1789,1798 ---- write_eventlog(edata->elevel, buf.data); else #endif ! if (Redirect_stderr) ! write_pipe_chunks(fileno(stderr),buf.data, buf.len); ! else ! write(fileno(stderr), buf.data, buf.len); } /* If in the syslogger process, try to write messages direct to file */ *************** *** 1794,1799 **** --- 1803,1836 ---- } + static void + write_pipe_chunks(int fd, char * data, int len) + { + PipeChunk p; + + Assert(len > 0); + + p.proto.nuls[0] = p.proto.nuls[1] = '\0'; + p.proto.pid = MyProcPid; + p.proto.is_last = 'f'; + p.proto.len = MAX_CHUNK; + + /* write all but the last chunk */ + while (len > MAX_CHUNK) + { + memcpy(p.proto.data, data, MAX_CHUNK); + write(fd, &p, PIPE_DATA_OFFSET + MAX_CHUNK ); + data += MAX_CHUNK; + len -= MAX_CHUNK; + } + + /* write the last chunk */ + p.proto.is_last = 't'; + p.proto.len = len; + memcpy(p.proto.data, data, len); + write(fd, &p, PIPE_DATA_OFFSET + len); + } + /* * Write error report to client */ Index: src/include/postmaster/syslogger.h =================================================================== RCS file: /cvsroot/pgsql/src/include/postmaster/syslogger.h,v retrieving revision 1.8 diff -c -r1.8 syslogger.h *** src/include/postmaster/syslogger.h 5 Jan 2007 22:19:57 -0000 1.8 --- src/include/postmaster/syslogger.h 13 Jun 2007 15:38:09 -0000 *************** *** 37,40 **** --- 37,64 ---- extern void SysLoggerMain(int argc, char *argv[]); #endif + /* + * primitive protocol structure for writing to syslogger pipe(s). + * + * we use 't' or 'f' instead of a bool to make the protocol a tiny bit + * more robust against finding a false double nul byte prologue. + * But we still might find it in the len and/or pid bytes unless we're careful. + */ + typedef struct + { + char nuls[2]; /* always \0\0 */ + uint16 len; /* size of this chunk */ + pid_t pid; /* our pid */ + char is_last; /* is this the last chunk? 't' or 'f' */ + char data[1]; + } PipeProto; + + typedef union + { + PipeProto proto; + char data[PIPE_BUF]; + } PipeChunk; + + #define PIPE_DATA_OFFSET offsetof(PipeProto, data) /* 9 usually */ + #endif /* _SYSLOGGER_H */
pgsql-patches by date: