*** a/contrib/Makefile --- b/contrib/Makefile *************** *** 32,37 **** SUBDIRS = \ --- 32,38 ---- pg_archivecleanup \ pg_buffercache \ pg_freespacemap \ + pg_logcollectdup \ pg_standby \ pg_stat_statements \ pg_test_fsync \ *** /dev/null --- b/contrib/pg_logcollectdup/Makefile *************** *** 0 **** --- 1,15 ---- + MODULE_big = pg_logcollectdup + OBJS = pg_logcollectdup.o + + EXTENSION = pg_logcollectdup + + ifdef USE_PGXS + PG_CONFIG = pg_config + PGXS := $(shell $(PG_CONFIG) --pgxs) + include $(PGXS) + else + subdir = contrib/pg_logcollectdup + top_builddir = ../.. + include $(top_builddir)/src/Makefile.global + include $(top_srcdir)/contrib/contrib-global.mk + endif *** /dev/null --- b/contrib/pg_logcollectdup/pg_logcollect_sample.c *************** *** 0 **** --- 1,207 ---- + /* + * pg_logcollect_sample.c + * + * Implements a stand-alone program that can read log collector aka syslogger + * pipe traffic and print it out in a readable character sequences (NUL bytes + * and binary numbers converted), without making any effort to defragment it. + * It's mostly intended to be a demonstration or sample, although it may be + * useful in its own right. + * + * Notably, this program does not link against Postgres at all. + */ + #include + #include + #include + #include + #include + + /* Taken from postgres/src/include/c.h */ + typedef unsigned short uint16; + typedef signed int int32; + + /* PIPE_CHUNK_SIZE definition taken from syslogger.h */ + + /* + * Primitive protocol structure for writing to syslogger pipe(s). The idea + * here is to divide long messages into chunks that are not more than + * PIPE_BUF bytes long, which according to POSIX spec must be written into + * the pipe atomically. The pipe reader then uses the protocol headers to + * reassemble the parts of a message into a single string. The reader can + * also cope with non-protocol data coming down the pipe, though we cannot + * guarantee long strings won't get split apart. + * + * We use non-nul bytes in is_last to make the protocol a tiny bit + * more robust against finding a false double nul byte prologue. But + * we still might find it in the len and/or pid bytes unless we're careful. + */ + + #ifdef PIPE_BUF + /* Are there any systems with PIPE_BUF > 64K? Unlikely, but ... */ + #if PIPE_BUF > 65536 + #define PIPE_CHUNK_SIZE 65536 + #else + #define PIPE_CHUNK_SIZE ((int) PIPE_BUF) + #endif + #else /* not defined */ + /* POSIX says the value of PIPE_BUF must be at least 512, so use that */ + #define PIPE_CHUNK_SIZE 512 + #endif + + + /* End PIPE_CHUNK_SIZE define */ + + /* Constants defined by both the protocol and the system's PIPE_BUF */ + #define PIPE_HEADER_SIZE (9) + #define PIPE_MAX_PAYLOAD ((PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE)) + + static ssize_t safe_read(int fd, void *buf, size_t count); + static void printInput(char *logbuffer, int *bytes_in_logbuffer); + + /* + * read, but retry as long as one receives EINTR. + */ + ssize_t + safe_read(int fd, void *buf, size_t count) + { + const int save_errno = 0; + ssize_t numRead; + + readAgain: + errno = 0; + numRead = read(fd, buf, count); + + if (numRead < 0 && errno == EINTR) + goto readAgain; + + /* + * If read() succeeds, then restore the old errno to avoid clearing errors + * on behalf of the caller. If it fails, then leave errno alone, since + * that's what read normally does anyway. + */ + if (errno == 0) + errno = save_errno; + + return numRead; + } + + /* + * Print input data, using code taken from syslogger.c but stripped of most of + * its more interesting functionality except stepping through the input buffer + * and printing it in a more palatable human-readable format. + */ + static void + printInput(char *logBuf, int *logBufLen) + { + char *cursor = logBuf; + + /* While there is enough data for a header, process it */ + while (*logBufLen >= PIPE_HEADER_SIZE) + { + char *nuls = cursor; + uint16 *len = (void *) (logBuf + 2); + int32 *pid = (void *) (logBuf + 4); + char *fmt = logBuf + 8; + char *payload = logBuf + 9; + + /* + * Sometimes, non-protocol traffic (e.g. libraries that write directly + * stderr) end up piped out of a Postgres process. Here, detect the + * Postgres format as obeyed by ereport/elog and handle if possible. + * The header looks like this: + * + * [NUL] [NUL] [DATALEN]*2 [PID Integer Fragment]*4 [t|T|f|F] + * + * The last byte deserves more explanation: + * + * * If capitalized, this is a CSV formatted record. If lower case, + * the log record respects the user-specified or default + * formatting. + * + * * If 't' or 'T', then this is the last fragment (termination) for + * a log message. + * + * * If 'f' or 'F', then this is a fragment that has a continuation + * yet to come. + * + * This also means the minimum protocol-abiding traffic may be nine + * bytes long on read(), and at maximum can be PIPE_CHUNK_SIZE, since + * protocol traffic relies on the atomic nature of fragmenting into + * PIPE_CHUNK_SIZE pieces. + */ + if (nuls[0] == '\0' && nuls[1] == '\0' && + *len > 0 && *len <= PIPE_MAX_PAYLOAD && + *pid != 0 && + (*fmt == 't' || *fmt == 'f' || + *fmt == 'T' || *fmt == 'F')) + { + const int chunkLen = PIPE_HEADER_SIZE + *len; + + printf("LEN=%d PID=%d FMT=%c %*s\n", *len, *pid, *fmt, + *len, payload); + + /* Finished processing this chunk */ + cursor += chunkLen; + *logBufLen -= chunkLen; + } + else + { + int protoCur; + + /* + * Process non-protocol data, but just in case, look for the + * beginning of some protocol traffic and re-start the formatting + * routine if that happens. + * + * It is expected that in many scenarios, a non-protocol message + * will arrive all in one read(), and we want to respect the read() + * boundary if possible. + * + * NB: Skip looking at the first byte, because the previous branch + * would have already spotted a valid chunk that is aligned + * properly, and not found inside some arbitrary data. + */ + for (protoCur = 1; protoCur < *logBufLen; protoCur += 1) + { + if (cursor[protoCur] == '\0') + break; + } + + printf("LEN= PID= FMT= %*s\n", protoCur, cursor); + + cursor += protoCur; + *logBufLen -= protoCur; + } + } + + /* Don't have a full chunk, so left-align what remains in the buffer */ + if (*logBufLen > 0 && cursor != logBuf) + memmove(logBuf, cursor, *logBufLen); + } + + int + main(void) + { + char buf[2 * PIPE_CHUNK_SIZE]; + int bufLen = 0; + + while (1) + { + int numRead; + + numRead = safe_read(0, buf, sizeof buf); + + /* Handle EOF */ + if (numRead == 0) + return 0; + + /* Exit if read doesn't work for whatever reason */ + if (numRead < 0) + return 1; + + bufLen += numRead; + + printInput(buf, &bufLen); + fflush(stdout); + + } + } *** /dev/null --- b/contrib/pg_logcollectdup/pg_logcollectdup.c *************** *** 0 **** --- 1,263 ---- + /* + * pg_logcollectdup.c + * + * Implements a module to be loaded via shared_preload_libraries that, should + * "logcollectdup.destination" be set in postgresql.conf and the log collector + * ("syslogger") be enabled will allow a copy of the log collection protocol + * traffic to be forwarded to file system path of once's choice. It is + * suggested that this path is most useful if it is a mkfifo named pipe, so + * that a completely seperate program can handle the protocol traffic. + */ + + #include "postgres.h" + + #include + #include + + #include "funcapi.h" + #include "postmaster/syslogger.h" + #include "utils/guc.h" + + PG_MODULE_MAGIC; + + /* GUC-configured destination of the log pages */ + static char *destination; + + static ProcessLogCollect_hook_type prev_ProcessLogCollect = NULL; + + static void openDestFd(char *dest, int *currentFd); + static void closeDestFd(int *currentFd); + static void logcollectdup_ProcessLogCollect(char *buf, int len); + static void call_ProcessLogCollect(char *buf, int len); + + /* + * File descriptor that log pages are written to. Is re-set if a + * write fails. + */ + static int currentFd = -1; + + void _PG_init(void); + void _PG_fini(void); + + /* + * _PG_init() - library load-time initialization + * + * DO NOT make this static nor change its name! + * + * Init the module, all we have to do here is getting our GUC + */ + void + _PG_init(void) { + PG_TRY(); + { + destination = GetConfigOptionByName( + "logcollectdup.destination", NULL); + } + PG_CATCH(); + { + DefineCustomStringVariable("logcollectdup.destination", + "Path send log collector bytes to", + "", + &destination, + "", + PGC_SUSET, + GUC_NOT_IN_SAMPLE, + NULL, + NULL, + NULL); + EmitWarningsOnPlaceholders("logcollectdup.destination"); + } + PG_END_TRY(); + + prev_ProcessLogCollect = ProcessLogCollect_hook; + ProcessLogCollect_hook = logcollectdup_ProcessLogCollect; + } + + /* + * Open the destination file descriptor for writing, refusing to return until + * there is success, writing the fd value into *fd. + * + * That may sound extreme, but considering that the log collector would also + * cause logging processes to block were it to halt or close and the whole + * point of this module is to allow some other process to obtain a copy of the + * protocol traffic, it seems reasonable. + */ + static void + openDestFd(char *dest, int *fd) + { + const int save_errno = errno; + struct stat stat; + + /* Spin until the pipe can be opened */ + while (dest != NULL && *fd < 0) + { + errno = 0; + *fd = open(dest, O_WRONLY); + + if (errno != 0) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("pg_logcollectdup cannot open destination"), + errdetail("The open() request failed with " + "the message: %s.", strerror(errno)))); + + /* + * Wait a while to not spin too aggressively while things are + * messed up. + */ + sleep(1); + } + + /* Must have a valid file descriptor here */ + Assert(*fd >= 0); + + /* + * Check if the opened file descriptor is a pipe. If it isn't, + * whine, close, and invalidate the file descriptor. + */ + fstat(*fd, &stat); + + if (!S_ISFIFO(stat.st_mode)) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("pg_logcollectdup only supports writing into " + "named pipes"))); + closeDestFd(fd); + } + + errno = save_errno; + } + + /* + * Close the passed file descriptor and invalidate it. + */ + static void + closeDestFd(int *fd) + { + const int save_errno = errno; + + do + { + errno = 0; + + /* + * Ignore errors except EINTR: other than EINTR, there is no + * obvious handling one can do from a failed close() that matters + * in this case. + */ + close(*fd); + + if (errno == EINTR) + continue; + + if (errno == EBADF) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("pg_logcollectdup attempted to close an " + "invalid file descriptor"), + errdetail("The file descriptor that failed a close " + "attempt was %d, and it failed with " + "the reason: %s.", + *fd, strerror(errno)))); + + /* Exit the EINTR retry loop */ + *fd = -1; + } while (*fd >= 0); + + errno = save_errno; + } + + static void + logcollectdup_ProcessLogCollect(char *buf, int len) + { + int save_errno = errno; + int bytesWritten; + + do + { + if (destination == NULL && currentFd < 0) + { + /* + * No destination defined, and no file descriptor open; in this + * case this extension was loaded but not configured, so just exit. + */ + goto exit; + } + else if (destination != NULL && currentFd < 0) + { + /* + * Destination defined, but no file descriptor open yet. Open the + * file descriptor very insistently; when this returns it must be + * open, which also means backends that need to log *will block* + * until this succeeds. + */ + openDestFd(destination, ¤tFd); + } + else if (destination == NULL && currentFd >= 0) + { + /* + * Destination undefined, but a file descriptor is still open. + * This can be the result of a SIGHUP/configuration change, so + * close and invalidate the file descriptor. + * + * Invalidates currentFd, continuing the retry loop. + */ + closeDestFd(¤tFd); + } + } while (currentFd < 0); + + writeAgain: + errno = 0; + bytesWritten = write(currentFd, buf, len); + + /* + * Given PIPE_BUF atomicity, only expect failed writes or complete + * writes. + */ + + Assert(bytesWritten < 0 || bytesWritten == len); + if (bytesWritten < 0) + { + if (errno == EINTR) + goto writeAgain; + + Assert(errno != 0); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("pg_logcollectdup cannot write to pipe"), + errdetail("The write failed with the message: %s", + strerror(errno)))); + + /* + * Try very hard to toss out the old file descriptor and get a new one + * when things go awry. + */ + closeDestFd(¤tFd); + openDestFd(destination, ¤tFd); + goto writeAgain; + } + + exit: + errno = save_errno; + call_ProcessLogCollect(buf, len); + } + + static void + call_ProcessLogCollect(char *buf, int len) + { + if (prev_ProcessLogCollect != NULL) + prev_ProcessLogCollect(buf, len); + else + standard_ProcessLogCollect(buf, len); + } + + /* + * Module unload callback + */ + void + _PG_fini(void) + { + /* Uninstall hook */ + ProcessLogCollect_hook = prev_ProcessLogCollect; + } *** /dev/null --- b/contrib/pg_logcollectdup/pg_logcollectdup.control *************** *** 0 **** --- 1,4 ---- + comment = 'forward log collector pipe traffic to a path' + default_version = '1.0' + module_pathname = '$libdir/pg_logcollectdup' + relocatable = true *** a/src/backend/postmaster/syslogger.c --- b/src/backend/postmaster/syslogger.c *************** *** 64,69 **** --- 64,70 ---- */ #define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE) + ProcessLogCollect_hook_type ProcessLogCollect_hook = NULL; /* * GUC parameters. Logging_collector cannot be changed after postmaster *************** *** 460,465 **** SysLoggerMain(int argc, char *argv[]) --- 461,471 ---- bytesRead = read(syslogPipe[0], logbuffer + bytes_in_logbuffer, sizeof(logbuffer) - bytes_in_logbuffer); + + if (ProcessLogCollect_hook != NULL) + ProcessLogCollect_hook(logbuffer + bytes_in_logbuffer, + bytesRead); + if (bytesRead < 0) { if (errno != EINTR) *************** *** 768,773 **** syslogger_parseArgs(int argc, char *argv[]) --- 774,785 ---- * -------------------------------- */ + void + standard_ProcessLogCollect(char *buf, int len) + { + /* No-op for now */ + } + /* * Process data received through the syslogger pipe. * *** a/src/include/postmaster/syslogger.h --- b/src/include/postmaster/syslogger.h *************** *** 14,19 **** --- 14,23 ---- #include /* for PIPE_BUF */ + /* Hook for plugins to process logs in process_pipe_input */ + typedef void (*ProcessLogCollect_hook_type) (char *buf, int len); + extern PGDLLIMPORT ProcessLogCollect_hook_type ProcessLogCollect_hook; + extern void standard_ProcessLogCollect(char *buf, int len); /* * Primitive protocol structure for writing to syslogger pipe(s). The idea