diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c new file mode 100644 index 32ac58f..f5db55a *** a/src/backend/storage/ipc/ipci.c --- b/src/backend/storage/ipc/ipci.c *************** *** 43,48 **** --- 43,49 ---- #include "storage/procsignal.h" #include "storage/sinvaladt.h" #include "storage/spin.h" + #include "utils/signal_demo.h" shmem_startup_hook_type shmem_startup_hook = NULL; *************** CreateSharedMemoryAndSemaphores(bool mak *** 139,144 **** --- 140,146 ---- size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, ProcPipeShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif *************** CreateSharedMemoryAndSemaphores(bool mak *** 243,248 **** --- 245,251 ---- ReplicationOriginShmemInit(); WalSndShmemInit(); WalRcvShmemInit(); + ProcPipeShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c new file mode 100644 index 0abde43..c28b44f *** a/src/backend/storage/ipc/procsignal.c --- b/src/backend/storage/ipc/procsignal.c *************** *** 27,32 **** --- 27,33 ---- #include "storage/sinval.h" #include "tcop/tcopprot.h" + #include "utils/signal_demo.h" /* * The SIGUSR1 signal is multiplexed to support signalling multiple event *************** bool set_latch_on_sigusr1; *** 69,75 **** static ProcSignalSlot *ProcSignalSlots = NULL; static volatile ProcSignalSlot *MyProcSignalSlot = NULL; ! static bool CheckProcSignal(ProcSignalReason reason); static void CleanupProcSignalState(int status, Datum arg); --- 70,76 ---- static ProcSignalSlot *ProcSignalSlots = NULL; static volatile ProcSignalSlot *MyProcSignalSlot = NULL; ! static bool CheckProcSignal(ProcSignalReason reason); static void CleanupProcSignalState(int status, Datum arg); *************** procsignal_sigusr1_handler(SIGNAL_ARGS) *** 296,301 **** --- 297,305 ---- if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_SEND_INFO)) + SendProcessInfoInterrupt(); + if (set_latch_on_sigusr1) SetLatch(MyLatch); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c new file mode 100644 index d917af3..51701b8 *** a/src/backend/tcop/postgres.c --- b/src/backend/tcop/postgres.c *************** ProcessInterrupts(void) *** 2991,2996 **** --- 2991,2999 ---- if (ParallelMessagePending) HandleParallelMessages(); + + if (SendProcessInfoPending) + HandleSendProcessInfo(); } diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile new file mode 100644 index 3ed0b44..864970c *** a/src/backend/utils/adt/Makefile --- b/src/backend/utils/adt/Makefile *************** OBJS = acl.o arrayfuncs.o array_expanded *** 36,42 **** tsquery_op.o tsquery_rewrite.o tsquery_util.o tsrank.o \ tsvector.o tsvector_op.o tsvector_parser.o \ txid.o uuid.o varbit.o varchar.o varlena.o version.o \ ! windowfuncs.o xid.o xml.o like.o: like.c like_match.c --- 36,42 ---- tsquery_op.o tsquery_rewrite.o tsquery_util.o tsrank.o \ tsvector.o tsvector_op.o tsvector_parser.o \ txid.o uuid.o varbit.o varchar.o varlena.o version.o \ ! windowfuncs.o xid.o xml.o signal_demo.o like.o: like.c like_match.c diff --git a/src/backend/utils/adt/signal_demo.c b/src/backend/utils/adt/signal_demo.c new file mode 100644 index ...5de50b5 *** a/src/backend/utils/adt/signal_demo.c --- b/src/backend/utils/adt/signal_demo.c *************** *** 0 **** --- 1,411 ---- + #include "postgres.h" + + #include "miscadmin.h" + + #include "storage/backendid.h" + #include "storage/ipc.h" + #include "storage/procarray.h" + #include "storage/shm_mq.h" + #include "storage/shm_toc.h" + #include "utils/builtins.h" + #include "utils/resowner.h" + #include "utils/signal_demo.h" + + #include "storage/dsm.h" + + + typedef struct + { + pid_t pss_pid; + pid_t sender_pss_pid; + BackendId sender_backendId; + dsm_handle dsm_seg_handle; + int data_tag; + bool is_valid; + bool is_in_process; + bool is_processed; + } ProcPipeSlot; + + + #define NumProcPipeSlots (MaxBackends + NUM_AUXPROCTYPES) + + #define PG_SEND_PROCESS_INFO_MAGIC 0x79fb2449 + #define SEND_INFO_DEMO_TAG 1 + + static ProcPipeSlot *ProcPipeSlots = NULL; + static volatile ProcPipeSlot *MyProcPipeSlot = NULL; + + volatile bool SendProcessInfoPending = false; + + static char *prepare_demo_data(int length, bool is_active_stmt); + + + Size + ProcPipeShmemSize(void) + { + return NumProcPipeSlots * sizeof(ProcPipeSlot); + } + + void + ProcPipeShmemInit(void) + { + Size size = ProcPipeShmemSize(); + bool found; + + ProcPipeSlots = (ProcPipeSlot *) + ShmemInitStruct("ProcPipeSlots", size, &found); + + if (!found) + MemSet(ProcPipeSlots, 0, size); + } + + static void + CleanupProcPipeSlot(int status, Datum arg) + { + int pss_idx = DatumGetInt32(arg); + volatile ProcPipeSlot *slot; + + slot = &ProcPipeSlots[pss_idx - 1]; + Assert(slot == MyProcPipeSlot); + + MyProcPipeSlot = NULL; + + if (slot->pss_pid != MyProcPid) + { + elog(LOG, "process %d releasing ProcSignal slot %d, but it contains %d", + MyProcPid, pss_idx, (int) slot->pss_pid); + return; /* XXX better to zero the slot anyway? */ + } + + slot->pss_pid = 0; + slot->is_valid = false; + } + + void + ProcPipeInit(int pss_idx) + { + volatile ProcPipeSlot *slot; + + slot = &ProcPipeSlots[pss_idx - 1]; + + if (slot->pss_pid != 0) + elog(LOG, "process %d taking over ProcPipe slot %d, but it's not empty", + MyProcPid, pss_idx); + + slot->pss_pid = MyProcPid; + slot->is_valid = false; + + MyProcPipeSlot = slot; + + on_shmem_exit(CleanupProcPipeSlot, Int32GetDatum(pss_idx)); + } + + /* + * Send request to data to targetted process. Result is taken from SendProcSignal + */ + int + SendProcessInfoSignal(pid_t data_sender_pid, BackendId data_sender_backendId, + dsm_handle dsm_seg_handle, int data_tag) + { + MyProcPipeSlot->sender_pss_pid = data_sender_pid; + MyProcPipeSlot->sender_backendId = data_sender_backendId; + MyProcPipeSlot->dsm_seg_handle = dsm_seg_handle; + MyProcPipeSlot->data_tag = data_tag; + MyProcPipeSlot->is_valid = true; + MyProcPipeSlot->is_in_process = false; + MyProcPipeSlot->is_processed = false; + + return SendProcSignal(data_sender_pid, PROCSIG_SEND_INFO, data_sender_backendId); + } + + void + InvalidateProcPipeSlot(void) + { + MyProcPipeSlot->is_valid = false; + } + + bool + is_valid_result(void) + { + return MyProcPipeSlot->is_valid; + } + + + /* + * signal is handled quickly - don't do heavy work here + */ + void + SendProcessInfoInterrupt(void) + { + volatile ProcPipeSlot *slot; + int i; + + for (i = NumProcPipeSlots - 1; i >= 0; i--) + { + slot = &ProcPipeSlots[i]; + if (slot->sender_pss_pid == MyProcPid && slot->is_valid + && !(slot->is_processed || slot->is_in_process)) + { + SendProcessInfoPending = true; + InterruptPending = true; + + SetLatch(MyLatch); + + return; + } + } + } + + static void + RestoreResourceOwner(ResourceOwner prevResourceOwner) + { + if (prevResourceOwner != CurrentResourceOwner) + { + ResourceOwner deleted_ro = CurrentResourceOwner; + + CurrentResourceOwner = prevResourceOwner; + ResourceOwnerDelete(deleted_ro); + } + } + + void + HandleSendProcessInfo(void) + { + volatile ProcPipeSlot *slot; + + while (SendProcessInfoPending) + { + int i; + + SendProcessInfoPending = false; + + for (i = NumProcPipeSlots - 1; i >= 0; i--) + { + slot = &ProcPipeSlots[i]; + + if (slot->sender_pss_pid == MyProcPid && slot->is_valid && !slot->is_processed) + { + ResourceOwner prevRO = CurrentResourceOwner; + + dsm_segment *seg = NULL; + shm_toc *toc = NULL; + shm_mq_handle *output; + shm_mq *mq; + int length, *length_ptr; + + /* fast leave when tag is unsupported */ + if (slot->data_tag != SEND_INFO_DEMO_TAG) + { + slot->is_processed = true; + slot->is_valid = false; + continue; + } + + /* Ensure valid resource owner for access to dsm */ + if (CurrentResourceOwner == NULL) + CurrentResourceOwner = ResourceOwnerCreate(NULL, "HandlerSendProcessInfo"); + + seg = dsm_attach(slot->dsm_seg_handle); + if (seg == NULL) + { + elog(LOG, "unable map dynamic memory segment"); + slot->is_processed = true; + slot->is_valid = false; + + RestoreResourceOwner(prevRO); + + continue; + } + + toc = shm_toc_attach(PG_SEND_PROCESS_INFO_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + { + elog(LOG, "bad magic number in passed dynamic memory segment"); + + dsm_detach(seg); + slot->is_processed = true; + slot->is_valid = false; + + RestoreResourceOwner(prevRO); + + continue; + } + + mq = shm_toc_lookup(toc, 0); + output = shm_mq_attach(mq, seg, NULL); + + length_ptr = shm_toc_lookup(toc, 1); + length = *length_ptr; + + /* do potentialy heavy and slow work */ + PG_TRY(); + { + char *data; + shm_mq_result res; + + data = prepare_demo_data(length, prevRO == CurrentResourceOwner); + res = shm_mq_send(output, length + 1, data, false); + if (res != SHM_MQ_SUCCESS) + { + elog(LOG, "unable to send data to recipient"); + + dsm_detach(seg); + slot->is_processed = true; + slot->is_valid = false; + } + + slot->is_processed = true; + shm_mq_detach(mq); + dsm_detach(seg); + + RestoreResourceOwner(prevRO); + } + PG_CATCH(); + { + slot->is_processed = true; + slot->is_valid = false; + + if (seg != NULL) + dsm_detach(seg); + + RestoreResourceOwner(prevRO); + + PG_RE_THROW(); + } + PG_END_TRY(); + + } + } + } + } + + /* + * generate some content that will be sended to recipient + */ + static char * + prepare_demo_data(int length, bool is_active_stmt) + { + char *buffer, *ptr; + char *chars = "0123456789"; + int padding = 0; + + length = length < 100 ? 100 : length; + + buffer = palloc(length + 1); + + if (!is_active_stmt) + { + sprintf(buffer, "idle sender pid: %d", MyProcPid); + return buffer; + } + + sprintf(buffer, "sender pid: %d\n", MyProcPid); + + ptr = buffer + strlen(buffer); + length -= strlen(buffer); + + while (length > 1) + { + *ptr++ = chars[padding++ % 10]; + + length--; + if (padding % 30 == 0) + { + *ptr++ = '\n'; + length--; + } + } + + *ptr++ = '#'; + *ptr = '\0'; + + return buffer; + } + + /* + * Send custom signal and returns string generated by signaled process. + */ + Datum + signal_process(PG_FUNCTION_ARGS) + { + int pid = PG_GETARG_INT32(0); + int length = PG_GETARG_INT32(1); + dsm_segment *seg; + shm_toc_estimator e; + shm_toc *toc; + shm_mq *mq; + shm_mq_handle *input; + shm_mq_result res; + Size segsize; + int *param_ptr; + text *result; + PGPROC *proc; + Size received_data_length; + char *data; + + if (pid == MyProcPid) + elog(ERROR, "cannot to send signal to self"); + + proc = BackendPidGetProc(pid); + if (proc == NULL) + elog(ERROR, "invalid pid"); + + if (length < 30) + elog(ERROR, "second parameter (length: %d) should be >= 30", length); + + + /* prepare shared dsm segment */ + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, 1024); + shm_toc_estimate_chunk(&e, sizeof(int)); + shm_toc_estimate_keys(&e, 2); + segsize = shm_toc_estimate(&e); + + seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (seg == NULL) + elog(ERROR, "cannot to create dsm segment"); + + PG_TRY(); + { + toc = shm_toc_create(PG_SEND_PROCESS_INFO_MAGIC, dsm_segment_address(seg), segsize); + if (toc == NULL) + elog(ERROR, "cannot to create toc"); + + /* prepare basic structures */ + mq = shm_mq_create(shm_toc_allocate(toc, 1024), 1024); + shm_mq_set_receiver(mq, MyProc); + shm_mq_set_sender(mq, proc); + shm_toc_insert(toc, 0, mq); + + param_ptr = (int *) shm_toc_allocate(toc, sizeof(int)); + *param_ptr = length; + shm_toc_insert(toc, 1, param_ptr); + + if (SendProcessInfoSignal(pid, proc->backendId , dsm_segment_handle(seg), SEND_INFO_DEMO_TAG)) + elog(ERROR, "could not send signal to process"); + + /* try to read from queue */ + input = shm_mq_attach(mq, seg, NULL); + res = shm_mq_receive(input, &received_data_length, (void **) &data, false); + if (res != SHM_MQ_SUCCESS) + elog(ERROR, "could not receive message"); + + if (!is_valid_result()) + elog(ERROR, "sender doesn't send valid data"); + + result = cstring_to_text_with_len(data, received_data_length - 1); + + InvalidateProcPipeSlot(); + shm_mq_detach(mq); + dsm_detach(seg); + } + PG_CATCH(); + { + InvalidateProcPipeSlot(); + dsm_detach(seg); + PG_RE_THROW(); + } + PG_END_TRY(); + + PG_RETURN_TEXT_P(result); + } diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c new file mode 100644 index 7b19714..5a36936 *** a/src/backend/utils/init/postinit.c --- b/src/backend/utils/init/postinit.c *************** *** 60,66 **** #include "utils/syscache.h" #include "utils/timeout.h" #include "utils/tqual.h" ! static HeapTuple GetDatabaseTuple(const char *dbname); static HeapTuple GetDatabaseTupleByOid(Oid dboid); --- 60,66 ---- #include "utils/syscache.h" #include "utils/timeout.h" #include "utils/tqual.h" ! #include "utils/signal_demo.h" static HeapTuple GetDatabaseTuple(const char *dbname); static HeapTuple GetDatabaseTupleByOid(Oid dboid); *************** InitPostgres(const char *in_dbname, Oid *** 588,593 **** --- 588,596 ---- /* Now that we have a BackendId, we can participate in ProcSignal */ ProcSignalInit(MyBackendId); + /* And same for ProcPipe */ + ProcPipeInit(MyBackendId); + /* * Also set up timeout handlers needed for backend operation. We need * these in every case except bootstrap. diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h new file mode 100644 index ddf7c67..ae25559 *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** DATA(insert OID = 2171 ( pg_cancel_backe *** 3134,3139 **** --- 3134,3141 ---- DESCR("cancel a server process' current query"); DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ )); DESCR("terminate a server process"); + DATA(insert OID = 4066 ( signal_process PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 25 "23 23" _null_ _null_ _null_ _null_ _null_ signal_process _null_ _null_ _null_ )); + DESCR("cancel a server process' current query"); DATA(insert OID = 2172 ( pg_start_backup PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 3220 "25 16" _null_ _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ )); DESCR("prepare for taking an online backup"); DATA(insert OID = 2173 ( pg_stop_backup PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 3220 "" _null_ _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ )); diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h new file mode 100644 index e0cc69f..3b9a7bf *** a/src/include/miscadmin.h --- b/src/include/miscadmin.h *************** *** 80,85 **** --- 80,86 ---- extern PGDLLIMPORT volatile bool InterruptPending; extern PGDLLIMPORT volatile bool QueryCancelPending; extern PGDLLIMPORT volatile bool ProcDiePending; + extern PGDLLIMPORT volatile bool SendProcessInfoPending; extern volatile bool ClientConnectionLost; *************** extern void SwitchBackToLocalLatch(void) *** 324,329 **** --- 325,333 ---- extern bool superuser(void); /* current user is superuser */ extern bool superuser_arg(Oid roleid); /* given user is superuser */ + /* utils/misc/signal_demo.c */ + extern void HandleSendProcessInfo(void); + /***************************************************************************** * pmod.h -- * diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h new file mode 100644 index af1a0cd..0d692a7 *** a/src/include/storage/procsignal.h --- b/src/include/storage/procsignal.h *************** typedef enum *** 32,37 **** --- 32,38 ---- PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */ PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */ PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */ + PROCSIG_SEND_INFO, /* internal info request */ /* Recovery conflict reasons */ PROCSIG_RECOVERY_CONFLICT_DATABASE, diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h new file mode 100644 index fc1679e..7310dc0 *** a/src/include/utils/builtins.h --- b/src/include/utils/builtins.h *************** extern Datum pg_prepared_statement(PG_FU *** 1264,1267 **** --- 1264,1270 ---- /* utils/mmgr/portalmem.c */ extern Datum pg_cursor(PG_FUNCTION_ARGS); + /* utils/signal_demo.c */ + extern Datum signal_process(PG_FUNCTION_ARGS); + #endif /* BUILTINS_H */ diff --git a/src/include/utils/signal_demo.h b/src/include/utils/signal_demo.h new file mode 100644 index ...0c951c4 *** a/src/include/utils/signal_demo.h --- b/src/include/utils/signal_demo.h *************** *** 0 **** --- 1,14 ---- + #include "storage/dsm.h" + + extern Size ProcPipeShmemSize(void); + extern void ProcPipeShmemInit(void); + extern void ProcPipeInit(int pss_idx); + + extern int SendProcessInfoSignal(pid_t data_sender_pid, BackendId data_sender_backendId, + dsm_handle dsm_seg_handle, int data_tag); + + extern void SendProcessInfoInterrupt(void); + extern void HandleSendProcessInfo(void); + + extern void InvalidateProcPipeSlot(void); + extern bool is_valid_result(void);