diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 6af19bd..0170df3 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2607,6 +2607,21 @@ SELECT * FROM parent WHERE key = 2400; + + degree-of-parallelism (integer) + + degree-of-parallelism configuration parameter + + + + Sets the maximum degree of paralleism (DOP) to be used by each backend + server. This is the maximum number of process than can be created in + order to do work in parallel. Currently this is only used by + quicksort. + + + + cursor_tuple_fraction (floating point) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index db61569..1ef6289 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -205,6 +205,8 @@ bool enable_bonjour = false; char *bonjour_name; bool restart_after_crash = true; +extern int degree_of_parallelism = 1; + /* PIDs of special child processes; 0 when not running */ static pid_t StartupPID = 0, BgWriterPID = 0, diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index bd29c5d..ee13110 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include "access/transam.h" #include "access/xact.h" @@ -57,6 +59,10 @@ bool log_lock_waits = false; /* Pointer to this process's PGPROC struct, if any */ PGPROC *MyProc = NULL; +/* These fields are for inter-process parallelism. */ +extern int sem_id_dop; /* Semaphore just to access the DOP counter. */ +extern int *shm_dop; /* Current degree of parallelism, the DOP counter. */ + /* * This spinlock protects the freelist of recycled PGPROC structures. * We cannot use an LWLock because the LWLock manager depends on already @@ -242,6 +248,13 @@ InitProcess(void) volatile PROC_HDR *procglobal = ProcGlobal; int i; + /* A structure for initialiting the DOP semaphore. */ + union semun { + int val; + struct semid_ds *buf; + ushort *array; + } argument; + /* * ProcGlobal should be set up already (if we are a backend, we inherit * this by fork() or EXEC_BACKEND mechanism from the postmaster). @@ -332,6 +345,17 @@ InitProcess(void) SHMQueueInit(&(MyProc->myProcLocks[i])); MyProc->recoveryConflictPending = false; + /* FIXME: handle error */ + sem_id_dop = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666); + argument.val = 1; + semctl(sem_id_dop, 0, SETVAL, argument); /* FIXME: handle error */ + shm_dop = shmat(sem_id_dop, NULL, 0); /* FIXME: handle error */ + /* + * Initialize the DOP to 1 because there is always 1 process active, which + * is the backend that was started to handle the connection. + */ + *shm_dop = 1; + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly @@ -712,6 +736,10 @@ ProcKill(int code, Datum arg) /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ if (AutovacuumLauncherPid != 0) kill(AutovacuumLauncherPid, SIGUSR2); + + /* Clean up the semaphore and shared memory used for the DOP variables. */ + shmdt(shm_dop); /* FIXME: handle error */ + shmctl(sem_id_dop, IPC_RMID, NULL); /* FIXME: handle error */ } /* diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index b209128..0d23b1c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -118,6 +118,7 @@ extern char *temp_tablespaces; extern bool synchronize_seqscans; extern bool fullPageWrites; extern int ssl_renegotiation_limit; +extern int degree_of_parallelism; #ifdef TRACE_SORT extern bool trace_sort; @@ -2118,6 +2119,15 @@ static struct config_int ConfigureNamesInt[] = 1024, 100, 102400, NULL, NULL }, + { + {"degree_of_parallelism", PGC_USERSET, QUERY_TUNING_OTHER, + gettext_noop("Sets the size maximum processes to be forked per backend."), + NULL, + }, + °ree_of_parallelism, + 1, 1, 1024, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d31f1a1..bf58f5f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -246,6 +246,8 @@ #join_collapse_limit = 8 # 1 disables collapsing of explicit # JOIN clauses +#degree_of_parallelism = 1 # range 1-1024 + #------------------------------------------------------------------------------ # ERROR REPORTING AND LOGGING diff --git a/src/port/qsort.c b/src/port/qsort.c index 7d50fb8..605c51d 100644 --- a/src/port/qsort.c +++ b/src/port/qsort.c @@ -1,5 +1,7 @@ /* - * qsort.c: standard quicksort algorithm + * qsort.c: a modified recursive parallel quicksort algorithm + * + * FIXME: Do we need to set up function for auxiliary processes for sorting? * * Modifications from vanilla NetBSD source: * Add do ... while() macro fix @@ -43,9 +45,26 @@ * SUCH DAMAGE. */ +#include +#include + #include "c.h" +/* + * FIXME: + * The only reason for defining these three variables here is because the + * quicksort code is built into libpgport and other files, such as + * postmaster.c and proc.c do not. I do not know what other scenarios would + * be more appropriate. + */ +int sem_id_dop; +int *shm_dop; +int degree_of_parallelism; + +static void get_lock(); +static void release_lock(); + static char *med3(char *a, char *b, char *c, int (*cmp) (const void *, const void *)); static void swapfunc(char *, char *, size_t, int); @@ -91,6 +110,30 @@ swapfunc(char *a, char *b, size_t n, int swaptype) #define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype) +static void +get_lock() +{ + struct sembuf operations[1]; + + operations[0].sem_num = 0; + operations[0].sem_op = -1; + operations[0].sem_flg = 0; + /* FIXME: handle error */ + semop(sem_id_dop, operations, 1); +} + +void static +release_lock() +{ + struct sembuf operations[1]; + + operations[0].sem_num = 0; + operations[0].sem_op = 1; + operations[0].sem_flg = 0; + /* FIXME: handle error */ + semop(sem_id_dop, operations, 1); +} + static char * med3(char *a, char *b, char *c, int (*cmp) (const void *, const void *)) { @@ -114,9 +157,18 @@ pg_qsort(void *a, size_t n, size_t es, int (*cmp) (const void *, const void *)) swaptype, presorted; -loop:SWAPINIT(a, es); + /* + * Use -1 to initialize because fork() uses 0 to identify a process as a + * child. + */ + int lchild = -1; + int rchild = -1; + int status; /* For waitpid() only. */ + + SWAPINIT(a, es); if (n < 7) { + /* Insertion sort if less than 7 elements. */ for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) for (pl = pm; pl > (char *) a && cmp(pl - es, pl) > 0; pl -= es) @@ -124,6 +176,7 @@ loop:SWAPINIT(a, es); return; } presorted = 1; + /* Check if sorted. */ for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) { if (cmp(pm - es, pm) > 0) @@ -148,6 +201,7 @@ loop:SWAPINIT(a, es); } pm = med3(pl, pm, pn, cmp); } + /* Begin "partition" logic. */ swap(a, pm); pa = pb = (char *) a + es; pc = pd = (char *) a + (n - 1) * es; @@ -177,19 +231,64 @@ loop:SWAPINIT(a, es); pb += es; pc -= es; } + /* End "partition" logic. */ pn = (char *) a + n * es; r = Min(pa - (char *) a, pb - pa); vecswap(a, pb - r, r); r = Min(pd - pc, pn - pd - es); vecswap(pb, pn - r, r); if ((r = pb - pa) > es) - qsort(a, r / es, es, cmp); + { + get_lock(); + if (*shm_dop < degree_of_parallelism) + { + /* Under the degree limit, fork. */ + ++*shm_dop; + release_lock(); + + lchild = fork(); /* FIXME: handle error */ + if (lchild == 0) { + /* The 'left' child starts processing. */ + qsort(a, r / es, es, cmp); + + get_lock(); + --*shm_dop; + release_lock(); + exit(0); + } + } + else + { + release_lock(); + qsort(a, r / es, es, cmp); + } + } if ((r = pd - pc) > es) { - /* Iterate rather than recurse to save stack space */ - a = pn - r; - n = r / es; - goto loop; + get_lock(); + if (*shm_dop < degree_of_parallelism) + { + /* Under the degree limit, fork. */ + ++*shm_dop; + release_lock(); + + rchild = fork(); /* FIXME: handle error */ + if (lchild == 0) { + /* The 'right' child starts processing. */ + qsort(pn - r, r / es, es, cmp); + + get_lock(); + --*shm_dop; + release_lock(); + exit(0); + } + } + else + { + release_lock(); + qsort(pn - r, r / es, es, cmp); + } } -/* qsort(pn - r, r / es, es, cmp);*/ + waitpid(lchild, &status, 0); + waitpid(rchild, &status, 0); } diff --git a/src/port/qsort_arg.c b/src/port/qsort_arg.c index 586f2f6..ca26e93 100644 --- a/src/port/qsort_arg.c +++ b/src/port/qsort_arg.c @@ -43,9 +43,19 @@ * SUCH DAMAGE. */ +#include +#include + #include "c.h" +int sem_id_dop; +int *shm_dop; +int degree_of_parallelism; + +static void get_lock(); +static void release_lock(); + static char *med3(char *a, char *b, char *c, qsort_arg_comparator cmp, void *arg); static void swapfunc(char *, char *, size_t, int); @@ -91,6 +101,30 @@ swapfunc(char *a, char *b, size_t n, int swaptype) #define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype) +static void +get_lock() +{ + struct sembuf operations[1]; + + operations[0].sem_num = 0; + operations[0].sem_op = -1; + operations[0].sem_flg = 0; + /* FIXME: handle error */ + semop(sem_id_dop, operations, 1); +} + +void static +release_lock() +{ + struct sembuf operations[1]; + + operations[0].sem_num = 0; + operations[0].sem_op = 1; + operations[0].sem_flg = 0; + /* FIXME: handle error */ + semop(sem_id_dop, operations, 1); +} + static char * med3(char *a, char *b, char *c, qsort_arg_comparator cmp, void *arg) { @@ -114,7 +148,11 @@ qsort_arg(void *a, size_t n, size_t es, qsort_arg_comparator cmp, void *arg) swaptype, presorted; -loop:SWAPINIT(a, es); + int lchild = -1; + int rchild = -1; + int status; /* For waitpid() only. */ + + SWAPINIT(a, es); if (n < 7) { for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) @@ -183,13 +221,57 @@ loop:SWAPINIT(a, es); r = Min(pd - pc, pn - pd - es); vecswap(pb, pn - r, r); if ((r = pb - pa) > es) - qsort_arg(a, r / es, es, cmp, arg); + { + get_lock(); + if (*shm_dop < degree_of_parallelism) + { + /* Under the degree limit, fork. */ + ++*shm_dop; + release_lock(); + + lchild = fork(); /* FIXME: handle error */ + if (lchild == 0) { + /* The 'left' child starts processing. */ + qsort_arg(a, r / es, es, cmp, arg); + + get_lock(); + --*shm_dop; + release_lock(); + exit(0); + } + } + else + { + release_lock(); + qsort_arg(a, r / es, es, cmp, arg); + } + } if ((r = pd - pc) > es) { - /* Iterate rather than recurse to save stack space */ - a = pn - r; - n = r / es; - goto loop; + get_lock(); + if (*shm_dop < degree_of_parallelism) + { + /* Under the degree limit, fork. */ + ++*shm_dop; + release_lock(); + + rchild = fork(); /* FIXME: handle error */ + if (lchild == 0) { + /* The 'right' child starts processing. */ + qsort_arg(pn - r, r / es, es, cmp, arg); + + get_lock(); + --*shm_dop; + release_lock(); + exit(0); + } + } + else + { + release_lock(); + qsort_arg(pn - r, r / es, es, cmp, arg); + } } -/* qsort_arg(pn - r, r / es, es, cmp, arg);*/ + waitpid(lchild, &status, 0); + waitpid(rchild, &status, 0); }