diff --git a/src/test/modules/test_shm_mq/setup.c b/src/test/modules/test_shm_mq/setup.c index e05e97c6de2..759b5b3a82b 100644 --- a/src/test/modules/test_shm_mq/setup.c +++ b/src/test/modules/test_shm_mq/setup.c @@ -18,11 +18,17 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgworker.h" +#include "storage/buffile.h" #include "storage/procsignal.h" #include "storage/shm_toc.h" #include "test_shm_mq.h" #include "utils/memutils.h" +typedef struct +{ + SharedFileSet fileset; +} shared_state; + typedef struct { int nworkers; @@ -32,7 +38,8 @@ typedef struct static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, dsm_segment **segp, test_shm_mq_header **hdrp, - shm_mq **outp, shm_mq **inp); + shm_mq **outp, shm_mq **inp, + BufFile **fdp); static worker_state *setup_background_workers(int nworkers, dsm_segment *seg); static void cleanup_background_workers(dsm_segment *seg, Datum arg); @@ -53,9 +60,10 @@ test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq *outq = NULL; /* placate compiler */ shm_mq *inq = NULL; /* placate compiler */ worker_state *wstate; + BufFile *fd; /* Set up a dynamic shared memory segment. */ - setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq); + setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq, &fd); *segp = seg; /* Register background workers. */ @@ -68,6 +76,10 @@ test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, /* Wait for workers to become ready. */ wait_for_workers_to_become_ready(wstate, hdr); + /* Force error processing */ + elog(ERROR, "force error"); + + BufFileClose(fd); /* * Once we reach this point, all workers are ready. We no longer need to * kill them if we die; they'll die on their own as the message queues @@ -88,7 +100,7 @@ test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, dsm_segment **segp, test_shm_mq_header **hdrp, - shm_mq **outp, shm_mq **inp) + shm_mq **outp, shm_mq **inp, BufFile **fdp) { shm_toc_estimator e; int i; @@ -96,6 +108,7 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers, dsm_segment *seg; shm_toc *toc; test_shm_mq_header *hdr; + BufFile *fd; /* Ensure a valid queue size. */ if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size) @@ -160,9 +173,18 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers, } } + SharedFileSetInit(&hdr->fileset, seg); +#if PG_VERSION_NUM >= 150000 + fd = BufFileCreateFileSet(&(hdr->fileset.fs), "test_mq_sharefile"); +#else + fd = BufFileCreateShared(&(hdr->fileset), "test_mq_sharefile"); +#endif + + /* Return results to caller. */ *segp = seg; *hdrp = hdr; + *fdp = fd; } /* diff --git a/src/test/modules/test_shm_mq/test_shm_mq.h b/src/test/modules/test_shm_mq/test_shm_mq.h index a6661218347..447c6ba61a7 100644 --- a/src/test/modules/test_shm_mq/test_shm_mq.h +++ b/src/test/modules/test_shm_mq/test_shm_mq.h @@ -15,6 +15,7 @@ #define TEST_SHM_MQ_H #include "storage/dsm.h" +#include "storage/sharedfileset.h" #include "storage/shm_mq.h" #include "storage/spin.h" @@ -32,6 +33,7 @@ typedef struct int workers_total; int workers_attached; int workers_ready; + SharedFileSet fileset; } test_shm_mq_header; /* Set up dynamic shared memory and background workers for test run. */ diff --git a/src/test/modules/test_shm_mq/worker.c b/src/test/modules/test_shm_mq/worker.c index 2180776a669..4372ccd81dc 100644 --- a/src/test/modules/test_shm_mq/worker.c +++ b/src/test/modules/test_shm_mq/worker.c @@ -19,12 +19,15 @@ #include "postgres.h" +#include "access/xact.h" #include "miscadmin.h" +#include "storage/buffile.h" #include "storage/ipc.h" #include "storage/procarray.h" #include "storage/shm_mq.h" #include "storage/shm_toc.h" #include "tcop/tcopprot.h" +#include "utils/resowner.h" #include "test_shm_mq.h" @@ -53,6 +56,7 @@ test_shm_mq_main(Datum main_arg) volatile test_shm_mq_header *hdr; int myworkernumber; PGPROC *registrant; + BufFile *fd; /* * Establish signal handlers. @@ -110,6 +114,19 @@ test_shm_mq_main(Datum main_arg) */ attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "worker"); + + /* + * Open shared file. The explicit typecasts are to keep compiler + * happy because hdr is declared as volatile here. + */ + SharedFileSetAttach((SharedFileSet *)&hdr->fileset, seg); +#if PG_VERSION_NUM >= 150000 + fd = BufFileOpenFileSet((FileSet *)&hdr->fileset.fs, "test_mq_sharefile", O_RDONLY, false); +#else + fd = BufFileOpenShared((SharedFileSet *)&hdr->fileset, "test_mq_sharefile", O_RDONLY); +#endif + /* * Indicate that we're fully initialized and ready to begin the main part * of the parallel operation. @@ -133,6 +150,14 @@ test_shm_mq_main(Datum main_arg) /* Do the work. */ copy_messages(inqh, outqh); + BufFileClose(fd); + + /* + * Force interrupt processing. Not sure why, but this makes it easy + * to reproduce the problem. + */ + CHECK_FOR_INTERRUPTS(); + /* * We're done. For cleanliness, explicitly detach from the shared memory * segment (that would happen anyway during process exit, though).