From 119d7ed341477b1c5f644ae289bf8cb01ad5bc5a Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Wed, 31 Oct 2018 00:12:39 +0100 Subject: [PATCH 1/2] Add infrastructure for signalling backends with payload This adds an API to include a payload consisting of a message, an elevel and a custom sqlerrcode, when signalling a backend. The main usecase is to be able to include context to the user as to why the connection is terminated or query cancelled. --- src/backend/storage/ipc/Makefile | 2 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/signal_message.c | 286 +++++++++++++++++++++++++++++++ src/backend/utils/init/postinit.c | 2 + src/include/storage/signal_message.h | 29 ++++ 5 files changed, 321 insertions(+), 1 deletion(-) create mode 100644 src/backend/storage/ipc/signal_message.c create mode 100644 src/include/storage/signal_message.h diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index 49e7c9f15e..fd628af59f 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -10,6 +10,6 @@ include $(top_builddir)/src/Makefile.global OBJS = barrier.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ procsignal.o shmem.o shmqueue.o shm_mq.o shm_toc.o signalfuncs.o \ - sinval.o sinvaladt.o standby.o + signal_message.o sinval.o sinvaladt.o standby.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 0c86a581c0..fb7c1a0b49 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -33,6 +33,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/origin.h" +#include "storage/signal_message.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" @@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, BackendRandomShmemSize()); + size = add_size(size, BackendSignalFeedbackShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -270,6 +272,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SyncScanShmemInit(); AsyncShmemInit(); BackendRandomShmemInit(); + BackendSignalFeedbackShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/signal_message.c b/src/backend/storage/ipc/signal_message.c new file mode 100644 index 0000000000..596f9584cc --- /dev/null +++ b/src/backend/storage/ipc/signal_message.c @@ -0,0 +1,286 @@ +/*------------------------------------------------------------------------- + * + * signal_message.c + * Functions for sending a message to a signalled backend + * + * This file contains routines to handle registering an optional message when + * cancelling, or terminating, a backend as well changing the sqlerrcode used. + * The combined payload of message/errcode is referred to as feedback. The + * message will be stored in shared memory and is limited to MAX_CANCEL_MSG + * characters including the NULL terminator. + * + * Access to the feedback slots is protected by spinlocks. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/storage/ipc/signal_message.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/shmem.h" +#include "storage/signal_message.h" +#include "storage/spin.h" + + +/* + * Structure for registering a feedback payload to be sent to a cancelled, or + * terminated backend. Each backend is registered per pid in the array which is + * indexed by Backend ID. Reading and writing the message is protected by a + * per-slot spinlock. + */ +typedef struct +{ + pid_t dest_pid; /* The pid of the process being signalled */ + pid_t src_pid; /* The pid of the processing signalling */ + slock_t mutex; /* Per-slot protection */ + char message[MAX_CANCEL_MSG]; /* Message to send to signalled backend */ + int orig_length; /* Length of the message as passed by the user, + * if this length exceeds MAX_CANCEL_MSG it will + * be truncated but we store the original length + * in order to be able to convey truncation */ + int sqlerrcode; /* errcode to use when signalling backend */ + int elevel; /* elevel to use when signalling backend */ +} BackendSignalFeedbackShmemStruct; + +static BackendSignalFeedbackShmemStruct *BackendSignalFeedbackSlots = NULL; +static volatile BackendSignalFeedbackShmemStruct *MyCancelSlot = NULL; +static void CleanupBackendSignalFeedback(int status, Datum argument); +static int backend_feedback(pid_t backend_pid, char *message, int sqlerrcode, + int elevel); + +/* + * Return the required size for the cancelation feedback Shmem area. + */ +Size +BackendSignalFeedbackShmemSize(void) +{ + return MaxBackends * sizeof(BackendSignalFeedbackShmemStruct); +} + +/* + * Create and initialize the Shmem structure for holding the feedback, the + * bookkeeping for them and the spinlocks associated. + */ +void +BackendSignalFeedbackShmemInit(void) +{ + Size size = BackendSignalFeedbackShmemSize(); + bool found; + int i; + + BackendSignalFeedbackSlots = (BackendSignalFeedbackShmemStruct *) + ShmemInitStruct("BackendSignalFeedbackSlots", size, &found); + + if (!found) + { + MemSet(BackendSignalFeedbackSlots, 0, size); + + for (i = 0; i < MaxBackends; i++) + SpinLockInit(&(BackendSignalFeedbackSlots[i].mutex)); + } +} + +/* + * Set up the slot for the current backend_id + */ +void +BackendSignalFeedbackInit(int backend_id) +{ + volatile BackendSignalFeedbackShmemStruct *slot; + + slot = &BackendSignalFeedbackSlots[backend_id - 1]; + + slot->message[0] = '\0'; + slot->orig_length = 0; + slot->sqlerrcode = 0; + slot->elevel = 0; + slot->dest_pid = MyProcPid; + + MyCancelSlot = slot; + + on_shmem_exit(CleanupBackendSignalFeedback, Int32GetDatum(backend_id)); +} + +/* + * Ensure that the slot is purged and emptied at exit. Any message gets + * overwritten with null chars to avoid risking exposing a message intended for + * another backend to a new backend. + */ +static void +CleanupBackendSignalFeedback(int status, Datum argument) +{ + int backend_id = DatumGetInt32(argument); + volatile BackendSignalFeedbackShmemStruct *slot; + + slot = &BackendSignalFeedbackSlots[backend_id - 1]; + + Assert(slot == MyCancelSlot); + + MyCancelSlot = NULL; + + if (slot->orig_length > 0) + MemSet(slot->message, '\0', sizeof(slot->message)); + + slot->orig_length = 0; + slot->sqlerrcode = 0; + slot->elevel = 0; + slot->dest_pid = 0; + slot->src_pid = 0; +} + +/* + * Set a message for the cancellation of the backend with the specified pid, + * using the default sqlerrcode. + */ +int +SetBackendCancelMessage(pid_t backend_pid, char *message) +{ + return backend_feedback(backend_pid, message, ERRCODE_QUERY_CANCELED, + ERROR); +} + +/* + * Set a message for the termination of the backend with the specified pid, + * using the default sqlerrcode. + */ +int +SetBackendTerminationMessage(pid_t backend_pid, char *message) +{ + return backend_feedback(backend_pid, message, ERRCODE_ADMIN_SHUTDOWN, + FATAL); +} + +/* + * Set both a message, elevel and a sqlerrcode for use when signalling the + * backend with the specified pid. + */ +int +SetBackendSignalFeedback(pid_t backend_pid, char *message, int sqlerrcode, + int elevel) +{ + return backend_feedback(backend_pid, message, sqlerrcode, elevel); +} + +/* + * Sets a cancellation message for the backend with the specified pid, and + * returns zero on success. If the backend isn't found, or no message is + * passed, 1 is returned. If two backends collide in setting a message, the + * existing message will be overwritten by the last one in. The message will + * be truncated to fit within MAX_CANCEL_MSG bytes. + */ +static int +backend_feedback(pid_t backend_pid, char *message, int sqlerrcode, int elevel) +{ + int i; + int len; + + if (!message) + return 1; + + len = pg_mbcliplen(message, strlen(message), MAX_CANCEL_MSG - 1); + + for (i = 0; i < MaxBackends; i++) + { + BackendSignalFeedbackShmemStruct *slot = &BackendSignalFeedbackSlots[i]; + + if (slot->dest_pid != 0 && slot->dest_pid == backend_pid) + { + SpinLockAcquire(&slot->mutex); + if (slot->dest_pid != backend_pid) + { + SpinLockRelease(&slot->mutex); + return 1; + } + + memcpy(slot->message, message, len); + slot->orig_length = pg_mbstrlen(message); + slot->sqlerrcode = sqlerrcode; + slot->elevel = elevel; + slot->src_pid = MyProcPid; + SpinLockRelease(&slot->mutex); + + if (len != strlen(message)) + ereport(NOTICE, + (errmsg("message is too long and has been truncated"))); + return 0; + } + } + + return 1; +} + +/* + * HasBackendSignalFeedback + * Test if there is a backend signalling feedback to consume + * + * Test whether there is feedback registered for the current backend that can + * be consumed and presented to the user. It isn't strictly required to call + * this function prior to consuming a potential message, but since consuming it + * will clear it there can be cases where one would like to peek first. + */ +bool +HasBackendSignalFeedback(void) +{ + volatile BackendSignalFeedbackShmemStruct *slot = MyCancelSlot; + bool has_message = false; + + if (slot != NULL) + { + SpinLockAcquire(&slot->mutex); + has_message = ((slot->orig_length > 0) && (slot->sqlerrcode != 0)); + SpinLockRelease(&slot->mutex); + } + + return has_message; +} + +/* + * ConsumeBackendSignalFeedback + * Read and clear backend signalling feedback + * + * Return the configured signal feedback in buffer, which is buf_len bytes in + * size. The original length of the message is returned, or zero in case no + * message was found. If the returned length exceeds that of Min(buf_len, + * MAX_CANCEL_MSG), then truncation has been performed. The feedback (message + * and errcode) is cleared on consumption. There is no point in passing a + * buffer larger than MAX_CANCEL_MSG as that is the upper bound on what will be + * stored in the slot. + */ +int +ConsumeBackendSignalFeedback(char *buffer, size_t buf_len, int *sqlerrcode, + pid_t *pid, int *elevel) +{ + volatile BackendSignalFeedbackShmemStruct *slot = MyCancelSlot; + int msg_length = 0; + + if (slot != NULL && slot->orig_length > 0) + { + SpinLockAcquire(&slot->mutex); + if (buffer && buf_len) + strlcpy(buffer, (const char *) slot->message, buf_len); + msg_length = slot->orig_length; + if (sqlerrcode) + *sqlerrcode = slot->sqlerrcode; + if (pid) + *pid = slot->src_pid; + if (elevel) + *elevel = slot->elevel; + + slot->orig_length = 0; + /* Avoid risking to leak any part of a previously set message */ + MemSet(slot->message, '\0', sizeof(slot->message)); + slot->sqlerrcode = 0; + slot->elevel = 0; + SpinLockRelease(&slot->mutex); + } + + return msg_length; +} diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 4f1d2a0d28..4382691420 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -40,6 +40,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" #include "replication/walsender.h" +#include "storage/signal_message.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -780,6 +781,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, PerformAuthentication(MyProcPort); InitializeSessionUserId(username, useroid); am_superuser = superuser(); + BackendSignalFeedbackInit(MyBackendId); } /* diff --git a/src/include/storage/signal_message.h b/src/include/storage/signal_message.h new file mode 100644 index 0000000000..e0f834f2f5 --- /dev/null +++ b/src/include/storage/signal_message.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * signal_message.h + * Declarations for sending a message to a signalled backend + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * + * src/include/storage/signal_message.h + * + *------------------------------------------------------------------------- + */ +#ifndef SIGNAL_MESSAGE_H +#define SIGNAL_MESSAGE_H + +#define MAX_CANCEL_MSG 128 + +extern Size BackendSignalFeedbackShmemSize(void); +extern void BackendSignalFeedbackShmemInit(void); +extern void BackendSignalFeedbackInit(int backend_id); + +extern int SetBackendCancelMessage(pid_t backend, char *message); +extern int SetBackendTerminationMessage(pid_t backend, char *message); +extern int SetBackendSignalFeedback(pid_t backend, char *message, + int sqlerrcode, int elevel); +extern bool HasBackendSignalFeedback(void); +extern int ConsumeBackendSignalFeedback(char *msg, size_t len, int *sqlerrcode, + pid_t *pid, int *elevel); + +#endif /* SIGNAL_MESSAGE_H */ -- 2.14.1.145.gb3622a4ee