diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 6af19bd..0170df3 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2607,6 +2607,21 @@ SELECT * FROM parent WHERE key = 2400;
+
+ degree-of-parallelism (integer)
+
+ degree-of-parallelism> configuration parameter
+
+
+
+ Sets the maximum degree of paralleism (DOP) to be used by each backend
+ server. This is the maximum number of process than can be created in
+ order to do work in parallel. Currently this is only used by
+ quicksort.
+
+
+
+
cursor_tuple_fraction (floating point)
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index db61569..1ef6289 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -205,6 +205,8 @@ bool enable_bonjour = false;
char *bonjour_name;
bool restart_after_crash = true;
+extern int degree_of_parallelism = 1;
+
/* PIDs of special child processes; 0 when not running */
static pid_t StartupPID = 0,
BgWriterPID = 0,
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index bd29c5d..ee13110 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -34,6 +34,8 @@
#include
#include
#include
+#include
+#include
#include "access/transam.h"
#include "access/xact.h"
@@ -57,6 +59,10 @@ bool log_lock_waits = false;
/* Pointer to this process's PGPROC struct, if any */
PGPROC *MyProc = NULL;
+/* These fields are for inter-process parallelism. */
+extern int sem_id_dop; /* Semaphore just to access the DOP counter. */
+extern int *shm_dop; /* Current degree of parallelism, the DOP counter. */
+
/*
* This spinlock protects the freelist of recycled PGPROC structures.
* We cannot use an LWLock because the LWLock manager depends on already
@@ -242,6 +248,13 @@ InitProcess(void)
volatile PROC_HDR *procglobal = ProcGlobal;
int i;
+ /* A structure for initialiting the DOP semaphore. */
+ union semun {
+ int val;
+ struct semid_ds *buf;
+ ushort *array;
+ } argument;
+
/*
* ProcGlobal should be set up already (if we are a backend, we inherit
* this by fork() or EXEC_BACKEND mechanism from the postmaster).
@@ -332,6 +345,17 @@ InitProcess(void)
SHMQueueInit(&(MyProc->myProcLocks[i]));
MyProc->recoveryConflictPending = false;
+ /* FIXME: handle error */
+ sem_id_dop = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
+ argument.val = 1;
+ semctl(sem_id_dop, 0, SETVAL, argument); /* FIXME: handle error */
+ shm_dop = shmat(sem_id_dop, NULL, 0); /* FIXME: handle error */
+ /*
+ * Initialize the DOP to 1 because there is always 1 process active, which
+ * is the backend that was started to handle the connection.
+ */
+ *shm_dop = 1;
+
/*
* We might be reusing a semaphore that belonged to a failed process. So
* be careful and reinitialize its value here. (This is not strictly
@@ -712,6 +736,10 @@ ProcKill(int code, Datum arg)
/* wake autovac launcher if needed -- see comments in FreeWorkerInfo */
if (AutovacuumLauncherPid != 0)
kill(AutovacuumLauncherPid, SIGUSR2);
+
+ /* Clean up the semaphore and shared memory used for the DOP variables. */
+ shmdt(shm_dop); /* FIXME: handle error */
+ shmctl(sem_id_dop, IPC_RMID, NULL); /* FIXME: handle error */
}
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b209128..0d23b1c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -118,6 +118,7 @@ extern char *temp_tablespaces;
extern bool synchronize_seqscans;
extern bool fullPageWrites;
extern int ssl_renegotiation_limit;
+extern int degree_of_parallelism;
#ifdef TRACE_SORT
extern bool trace_sort;
@@ -2118,6 +2119,15 @@ static struct config_int ConfigureNamesInt[] =
1024, 100, 102400, NULL, NULL
},
+ {
+ {"degree_of_parallelism", PGC_USERSET, QUERY_TUNING_OTHER,
+ gettext_noop("Sets the size maximum processes to be forked per backend."),
+ NULL,
+ },
+ °ree_of_parallelism,
+ 1, 1, 1024, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index d31f1a1..bf58f5f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -246,6 +246,8 @@
#join_collapse_limit = 8 # 1 disables collapsing of explicit
# JOIN clauses
+#degree_of_parallelism = 1 # range 1-1024
+
#------------------------------------------------------------------------------
# ERROR REPORTING AND LOGGING
diff --git a/src/port/qsort.c b/src/port/qsort.c
index 7d50fb8..605c51d 100644
--- a/src/port/qsort.c
+++ b/src/port/qsort.c
@@ -1,5 +1,7 @@
/*
- * qsort.c: standard quicksort algorithm
+ * qsort.c: a modified recursive parallel quicksort algorithm
+ *
+ * FIXME: Do we need to set up function for auxiliary processes for sorting?
*
* Modifications from vanilla NetBSD source:
* Add do ... while() macro fix
@@ -43,9 +45,26 @@
* SUCH DAMAGE.
*/
+#include
+#include
+
#include "c.h"
+/*
+ * FIXME:
+ * The only reason for defining these three variables here is because the
+ * quicksort code is built into libpgport and other files, such as
+ * postmaster.c and proc.c do not. I do not know what other scenarios would
+ * be more appropriate.
+ */
+int sem_id_dop;
+int *shm_dop;
+int degree_of_parallelism;
+
+static void get_lock();
+static void release_lock();
+
static char *med3(char *a, char *b, char *c,
int (*cmp) (const void *, const void *));
static void swapfunc(char *, char *, size_t, int);
@@ -91,6 +110,30 @@ swapfunc(char *a, char *b, size_t n, int swaptype)
#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
+static void
+get_lock()
+{
+ struct sembuf operations[1];
+
+ operations[0].sem_num = 0;
+ operations[0].sem_op = -1;
+ operations[0].sem_flg = 0;
+ /* FIXME: handle error */
+ semop(sem_id_dop, operations, 1);
+}
+
+void static
+release_lock()
+{
+ struct sembuf operations[1];
+
+ operations[0].sem_num = 0;
+ operations[0].sem_op = 1;
+ operations[0].sem_flg = 0;
+ /* FIXME: handle error */
+ semop(sem_id_dop, operations, 1);
+}
+
static char *
med3(char *a, char *b, char *c, int (*cmp) (const void *, const void *))
{
@@ -114,9 +157,18 @@ pg_qsort(void *a, size_t n, size_t es, int (*cmp) (const void *, const void *))
swaptype,
presorted;
-loop:SWAPINIT(a, es);
+ /*
+ * Use -1 to initialize because fork() uses 0 to identify a process as a
+ * child.
+ */
+ int lchild = -1;
+ int rchild = -1;
+ int status; /* For waitpid() only. */
+
+ SWAPINIT(a, es);
if (n < 7)
{
+ /* Insertion sort if less than 7 elements. */
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && cmp(pl - es, pl) > 0;
pl -= es)
@@ -124,6 +176,7 @@ loop:SWAPINIT(a, es);
return;
}
presorted = 1;
+ /* Check if sorted. */
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
{
if (cmp(pm - es, pm) > 0)
@@ -148,6 +201,7 @@ loop:SWAPINIT(a, es);
}
pm = med3(pl, pm, pn, cmp);
}
+ /* Begin "partition" logic. */
swap(a, pm);
pa = pb = (char *) a + es;
pc = pd = (char *) a + (n - 1) * es;
@@ -177,19 +231,64 @@ loop:SWAPINIT(a, es);
pb += es;
pc -= es;
}
+ /* End "partition" logic. */
pn = (char *) a + n * es;
r = Min(pa - (char *) a, pb - pa);
vecswap(a, pb - r, r);
r = Min(pd - pc, pn - pd - es);
vecswap(pb, pn - r, r);
if ((r = pb - pa) > es)
- qsort(a, r / es, es, cmp);
+ {
+ get_lock();
+ if (*shm_dop < degree_of_parallelism)
+ {
+ /* Under the degree limit, fork. */
+ ++*shm_dop;
+ release_lock();
+
+ lchild = fork(); /* FIXME: handle error */
+ if (lchild == 0) {
+ /* The 'left' child starts processing. */
+ qsort(a, r / es, es, cmp);
+
+ get_lock();
+ --*shm_dop;
+ release_lock();
+ exit(0);
+ }
+ }
+ else
+ {
+ release_lock();
+ qsort(a, r / es, es, cmp);
+ }
+ }
if ((r = pd - pc) > es)
{
- /* Iterate rather than recurse to save stack space */
- a = pn - r;
- n = r / es;
- goto loop;
+ get_lock();
+ if (*shm_dop < degree_of_parallelism)
+ {
+ /* Under the degree limit, fork. */
+ ++*shm_dop;
+ release_lock();
+
+ rchild = fork(); /* FIXME: handle error */
+ if (lchild == 0) {
+ /* The 'right' child starts processing. */
+ qsort(pn - r, r / es, es, cmp);
+
+ get_lock();
+ --*shm_dop;
+ release_lock();
+ exit(0);
+ }
+ }
+ else
+ {
+ release_lock();
+ qsort(pn - r, r / es, es, cmp);
+ }
}
-/* qsort(pn - r, r / es, es, cmp);*/
+ waitpid(lchild, &status, 0);
+ waitpid(rchild, &status, 0);
}
diff --git a/src/port/qsort_arg.c b/src/port/qsort_arg.c
index 586f2f6..ca26e93 100644
--- a/src/port/qsort_arg.c
+++ b/src/port/qsort_arg.c
@@ -43,9 +43,19 @@
* SUCH DAMAGE.
*/
+#include
+#include
+
#include "c.h"
+int sem_id_dop;
+int *shm_dop;
+int degree_of_parallelism;
+
+static void get_lock();
+static void release_lock();
+
static char *med3(char *a, char *b, char *c,
qsort_arg_comparator cmp, void *arg);
static void swapfunc(char *, char *, size_t, int);
@@ -91,6 +101,30 @@ swapfunc(char *a, char *b, size_t n, int swaptype)
#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
+static void
+get_lock()
+{
+ struct sembuf operations[1];
+
+ operations[0].sem_num = 0;
+ operations[0].sem_op = -1;
+ operations[0].sem_flg = 0;
+ /* FIXME: handle error */
+ semop(sem_id_dop, operations, 1);
+}
+
+void static
+release_lock()
+{
+ struct sembuf operations[1];
+
+ operations[0].sem_num = 0;
+ operations[0].sem_op = 1;
+ operations[0].sem_flg = 0;
+ /* FIXME: handle error */
+ semop(sem_id_dop, operations, 1);
+}
+
static char *
med3(char *a, char *b, char *c, qsort_arg_comparator cmp, void *arg)
{
@@ -114,7 +148,11 @@ qsort_arg(void *a, size_t n, size_t es, qsort_arg_comparator cmp, void *arg)
swaptype,
presorted;
-loop:SWAPINIT(a, es);
+ int lchild = -1;
+ int rchild = -1;
+ int status; /* For waitpid() only. */
+
+ SWAPINIT(a, es);
if (n < 7)
{
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
@@ -183,13 +221,57 @@ loop:SWAPINIT(a, es);
r = Min(pd - pc, pn - pd - es);
vecswap(pb, pn - r, r);
if ((r = pb - pa) > es)
- qsort_arg(a, r / es, es, cmp, arg);
+ {
+ get_lock();
+ if (*shm_dop < degree_of_parallelism)
+ {
+ /* Under the degree limit, fork. */
+ ++*shm_dop;
+ release_lock();
+
+ lchild = fork(); /* FIXME: handle error */
+ if (lchild == 0) {
+ /* The 'left' child starts processing. */
+ qsort_arg(a, r / es, es, cmp, arg);
+
+ get_lock();
+ --*shm_dop;
+ release_lock();
+ exit(0);
+ }
+ }
+ else
+ {
+ release_lock();
+ qsort_arg(a, r / es, es, cmp, arg);
+ }
+ }
if ((r = pd - pc) > es)
{
- /* Iterate rather than recurse to save stack space */
- a = pn - r;
- n = r / es;
- goto loop;
+ get_lock();
+ if (*shm_dop < degree_of_parallelism)
+ {
+ /* Under the degree limit, fork. */
+ ++*shm_dop;
+ release_lock();
+
+ rchild = fork(); /* FIXME: handle error */
+ if (lchild == 0) {
+ /* The 'right' child starts processing. */
+ qsort_arg(pn - r, r / es, es, cmp, arg);
+
+ get_lock();
+ --*shm_dop;
+ release_lock();
+ exit(0);
+ }
+ }
+ else
+ {
+ release_lock();
+ qsort_arg(pn - r, r / es, es, cmp, arg);
+ }
}
-/* qsort_arg(pn - r, r / es, es, cmp, arg);*/
+ waitpid(lchild, &status, 0);
+ waitpid(rchild, &status, 0);
}