Re: PATCH: pgbench - option to build using ppoll() for larger connection counts - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: PATCH: pgbench - option to build using ppoll() for larger connection counts |
Date | |
Msg-id | 27421.1537632580@sss.pgh.pa.us Whole thread Raw |
In response to | Re: PATCH: pgbench - option to build using ppoll() for largerconnection counts (Fabien COELHO <coelho@cri.ensmp.fr>) |
Responses |
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts
|
List | pgsql-hackers |
Fabien COELHO <coelho@cri.ensmp.fr> writes: > The patch was not applying cleanly anymore for me, so here is a rebase of > your latest version. The cfbot doesn't like that patch, probably because of the Windows newlines. Here's a version with regular newlines, and some cosmetic cleanup in the configure infrastructure. I haven't looked at the pgbench changes proper yet, but I did quickly test building on FreeBSD 11, which has ppoll, and it falls over: pgbench.c:6080:69: error: use of undeclared identifier 'POLLRDHUP' ...== -1 || (PQsocket(con) >= 0 && !(sa[idx].revents & POLL_UNWANTED))) ^ pgbench.c:6059:24: note: expanded from macro 'POLL_UNWANTED' #define POLL_UNWANTED (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL) ^ pgbench.c:6085:42: error: use of undeclared identifier 'POLLRDHUP' errno, sa[idx].fd, (sa[idx].revents & POLL_UNWANTED)); ^ pgbench.c:6059:24: note: expanded from macro 'POLL_UNWANTED' #define POLL_UNWANTED (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL) ^ pgbench.c:6107:19: error: use of undeclared identifier 'POLLRDHUP' sa[idx].events = POLL_EVENTS; ^ pgbench.c:6057:22: note: expanded from macro 'POLL_EVENTS' #define POLL_EVENTS (POLLRDHUP|POLLIN|POLLPRI) ^ 3 errors generated. make[3]: *** [<builtin>: pgbench.o] Error 1 I'm strongly tempted to just remove the POLL_UNWANTED business altogether, as it seems both pointless and unportable on its face. Almost by definition, we can't know what "other" bits a given implementation might set. I'm not entirely following the point of including POLLRDHUP in POLL_EVENTS, either. What's wrong with the traditional solution of detecting EOF? regards, tom lane diff --git a/configure b/configure index 9b30402..21ecd29 100755 *** a/configure --- b/configure *************** fi *** 15093,15099 **** LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` ! for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstatpthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimeswcstombs_l do : as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" --- 15093,15099 ---- LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` ! for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppollpstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimeswcstombs_l do : as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" diff --git a/configure.in b/configure.in index 2e60a89..8fe6894 100644 *** a/configure.in --- b/configure.in *************** PGAC_FUNC_WCSTOMBS_L *** 1562,1568 **** LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` ! AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocatepstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_rangeutime utimes wcstombs_l]) AC_REPLACE_FUNCS(fseeko) case $host_os in --- 1562,1568 ---- LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` ! AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocateppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_rangeutime utimes wcstombs_l]) AC_REPLACE_FUNCS(fseeko) case $host_os in diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 41b756c..3d378db 100644 *** a/src/bin/pgbench/pgbench.c --- b/src/bin/pgbench/pgbench.c *************** *** 45,53 **** --- 45,62 ---- #include <signal.h> #include <time.h> #include <sys/time.h> + #ifndef PGBENCH_USE_SELECT /* force use of select(2)? */ + #ifdef HAVE_PPOLL + #define POLL_USING_PPOLL + #include <poll.h> + #endif + #endif + #ifndef POLL_USING_PPOLL + #define POLL_USING_SELECT #ifdef HAVE_SYS_SELECT_H #include <sys/select.h> #endif + #endif #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> /* for getrlimit */ *************** static int pthread_join(pthread_t th, vo *** 92,104 **** /******************************************************************** * some configurable parameters */ ! ! /* max number of clients allowed */ #ifdef FD_SETSIZE ! #define MAXCLIENTS (FD_SETSIZE - 10) #else ! #define MAXCLIENTS 1024 #endif #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ --- 101,119 ---- /******************************************************************** * some configurable parameters */ ! #ifdef POLL_USING_SELECT /* using select(2) */ ! #define SOCKET_WAIT_METHOD "select" ! typedef fd_set socket_set; #ifdef FD_SETSIZE ! #define MAXCLIENTS (FD_SETSIZE - 10) /* system limited max number of clients allowed */ #else ! #define MAXCLIENTS 1024 /* max number of clients allowed */ #endif + #else /* using ppoll(2) */ + #define SOCKET_WAIT_METHOD "ppoll" + typedef struct pollfd socket_set; + #define MAXCLIENTS -1 /* unlimited number of clients */ + #endif /* POLL_USING_SELECT */ #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ *************** static void addScript(ParsedScript scrip *** 525,530 **** --- 540,552 ---- static void *threadRun(void *arg); static void setalarm(int seconds); static void finishCon(CState *st); + static socket_set *alloc_socket_set(int count); + static bool error_on_socket(socket_set *sa, int idx, PGconn *con); + static void free_socket_set(socket_set *sa); + static bool ignore_socket(socket_set *sa, int idx, PGconn *con); + static void clear_socket_set(socket_set *sa, int count); + static void set_socket(socket_set *sa, int fd, int idx); + static int wait_on_socket_set(socket_set *sa, int nstate, int maxsock, int64 usec); /* callback functions for our flex lexer */ *************** doConnect(void) *** 1143,1148 **** --- 1165,1171 ---- !have_password) { PQfinish(conn); + conn = NULL; simple_prompt("Password: ", password, sizeof(password), false); have_password = true; new_pass = true; *************** main(int argc, char **argv) *** 4903,4909 **** case 'c': benchmarking_option_set = true; nclients = atoi(optarg); ! if (nclients <= 0 || nclients > MAXCLIENTS) { fprintf(stderr, "invalid number of clients: \"%s\"\n", optarg); --- 4926,4932 ---- case 'c': benchmarking_option_set = true; nclients = atoi(optarg); ! if (nclients <= 0 || (MAXCLIENTS != -1 && nclients > MAXCLIENTS)) { fprintf(stderr, "invalid number of clients: \"%s\"\n", optarg); *************** threadRun(void *arg) *** 5614,5619 **** --- 5637,5643 ---- int64 next_report = last_report + (int64) progress * 1000000; StatsData last, aggs; + socket_set *sockets = alloc_socket_set(nstate); /* * Initialize throttling rate target for all of the thread's clients. It *************** threadRun(void *arg) *** 5657,5662 **** --- 5681,5687 ---- { if ((state[i].con = doConnect()) == NULL) goto done; + set_socket(sockets, PQsocket(state[i].con), i); } } *************** threadRun(void *arg) *** 5673,5685 **** /* loop till all clients have terminated */ while (remains > 0) { - fd_set input_mask; int maxsock; /* max socket number to be waited for */ int64 min_usec; int64 now_usec = 0; /* set this only if needed */ /* identify which client sockets should be checked for input */ ! FD_ZERO(&input_mask); maxsock = -1; min_usec = PG_INT64_MAX; for (i = 0; i < nstate; i++) --- 5698,5709 ---- /* loop till all clients have terminated */ while (remains > 0) { int maxsock; /* max socket number to be waited for */ int64 min_usec; int64 now_usec = 0; /* set this only if needed */ /* identify which client sockets should be checked for input */ ! clear_socket_set(sockets, nstate); maxsock = -1; min_usec = PG_INT64_MAX; for (i = 0; i < nstate; i++) *************** threadRun(void *arg) *** 5728,5734 **** goto done; } ! FD_SET(sock, &input_mask); if (maxsock < sock) maxsock = sock; } --- 5752,5758 ---- goto done; } ! set_socket(sockets, sock, i); if (maxsock < sock) maxsock = sock; } *************** threadRun(void *arg) *** 5765,5771 **** /* * If no clients are ready to execute actions, sleep until we receive * data from the server, or a nap-time specified in the script ends, ! * or it's time to print a progress report. Update input_mask to show * which client(s) received data. */ if (min_usec > 0) --- 5789,5795 ---- /* * If no clients are ready to execute actions, sleep until we receive * data from the server, or a nap-time specified in the script ends, ! * or it's time to print a progress report. Update sockets to show * which client(s) received data. */ if (min_usec > 0) *************** threadRun(void *arg) *** 5776,5786 **** { if (maxsock != -1) { ! struct timeval timeout; ! ! timeout.tv_sec = min_usec / 1000000; ! timeout.tv_usec = min_usec % 1000000; ! nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); } else /* nothing active, simple sleep */ { --- 5800,5806 ---- { if (maxsock != -1) { ! nsocks = wait_on_socket_set(sockets, nstate, maxsock, min_usec); } else /* nothing active, simple sleep */ { *************** threadRun(void *arg) *** 5789,5795 **** } else /* no explicit delay, select without timeout */ { ! nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); } if (nsocks < 0) --- 5809,5815 ---- } else /* no explicit delay, select without timeout */ { ! nsocks = wait_on_socket_set(sockets, nstate, maxsock, 0); } if (nsocks < 0) *************** threadRun(void *arg) *** 5800,5806 **** continue; } /* must be something wrong */ ! fprintf(stderr, "select() failed: %s\n", strerror(errno)); goto done; } } --- 5820,5826 ---- continue; } /* must be something wrong */ ! fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno)); goto done; } } *************** threadRun(void *arg) *** 5809,5815 **** /* min_usec == 0, i.e. something needs to be executed */ /* If we didn't call select(), don't try to read any data */ ! FD_ZERO(&input_mask); } /* ok, advance the state machine of each connection */ --- 5829,5835 ---- /* min_usec == 0, i.e. something needs to be executed */ /* If we didn't call select(), don't try to read any data */ ! clear_socket_set(sockets, nstate); } /* ok, advance the state machine of each connection */ *************** threadRun(void *arg) *** 5820,5835 **** if (st->state == CSTATE_WAIT_RESULT) { /* don't call doCustom unless data is available */ - int sock = PQsocket(st->con); ! if (sock < 0) ! { ! fprintf(stderr, "invalid socket: %s", ! PQerrorMessage(st->con)); goto done; - } ! if (!FD_ISSET(sock, &input_mask)) continue; } else if (st->state == CSTATE_FINISHED || --- 5840,5850 ---- if (st->state == CSTATE_WAIT_RESULT) { /* don't call doCustom unless data is available */ ! if (error_on_socket(sockets, i, st->con)) goto done; ! if (ignore_socket(sockets, i, st->con)) continue; } else if (st->state == CSTATE_FINISHED || *************** done: *** 5967,5972 **** --- 5982,5989 ---- fclose(thread->logfile); thread->logfile = NULL; } + free_socket_set(sockets); + sockets = NULL; return NULL; } *************** finishCon(CState *st) *** 5980,5985 **** --- 5997,6131 ---- } } + #ifdef POLL_USING_SELECT /* select(2) based socket polling */ + static socket_set * + alloc_socket_set(int count) + { + return (socket_set *) pg_malloc0(sizeof(socket_set)); + } + + static void + free_socket_set(socket_set *sa) + { + pg_free(sa); + } + + static bool + error_on_socket(socket_set *sa, int idx, PGconn *con) + { + if (PQsocket(con) >= 0) return false; + fprintf(stderr, "invalid socket: %s", PQerrorMessage(con)); + return true; + } + + static bool + ignore_socket(socket_set *sa, int idx, PGconn *con) + { + return !(FD_ISSET(PQsocket(con), sa)); + } + + static void + clear_socket_set(socket_set *sa, int count) + { + FD_ZERO(sa); + } + + static void + set_socket(socket_set *sa, int fd, int idx) + { + FD_SET(fd, sa); + } + + static int + wait_on_socket_set(socket_set *sa, int nstate, int maxsock, int64 usec) + { + struct timeval timeout; + + if (usec) + { + timeout.tv_sec = usec / 1000000; + timeout.tv_usec = usec % 1000000; + return select(maxsock + 1, sa, NULL, NULL, &timeout); + } + else + { + return select(maxsock + 1, sa, NULL, NULL, NULL); + } + } + #else /* ppoll(2) based socket polling */ + /* ppoll() will block until timeout or one of POLL_EVENTS occurs. */ + #define POLL_EVENTS (POLLRDHUP|POLLIN|POLLPRI) + /* ppoll() events returned that we do not want/expect to see. */ + #define POLL_UNWANTED (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL) + + static socket_set * + alloc_socket_set(int count) + { + return (socket_set *) pg_malloc0(sizeof(socket_set) * count); + } + + static void + free_socket_set(socket_set *sa) + { + pg_free(sa); + } + + static bool + error_on_socket(socket_set *sa, int idx, PGconn *con) + { + /* + * No error if socket not used or non-error status from PQsocket() and none + * of the unwanted ppoll() return events. + */ + if (sa[idx].fd == -1 || (PQsocket(con) >= 0 && !(sa[idx].revents & POLL_UNWANTED))) + return false; + fprintf(stderr, "invalid socket: %s", PQerrorMessage(con)); + if (debug) + fprintf(stderr, "ppoll() fail - errno: %d, socket: %d, events: %x\n", + errno, sa[idx].fd, (sa[idx].revents & POLL_UNWANTED)); + return true; + } + + static bool + ignore_socket(socket_set *sa, int idx, PGconn *con) + { + return (sa[idx].fd != -1 && !sa[idx].revents); + } + + static void + clear_socket_set(socket_set *sa, int count) + { + int i = 0; + for (i = 0; i < count; i++) + set_socket(sa, -1, i); + } + + static void + set_socket(socket_set *sa, int fd, int idx) + { + sa[idx].fd = fd; + sa[idx].events = POLL_EVENTS; + sa[idx].revents = 0; + } + + static int + wait_on_socket_set(socket_set *sa, int nstate, int maxsock, int64 usec) + { + struct timespec timeout; + + if (usec) + { + timeout.tv_sec = usec / 1000000; + timeout.tv_nsec = usec % 1000000000; + return ppoll(sa, nstate, &timeout, NULL); + } + else + { + return ppoll(sa, nstate, NULL, NULL); + } + } + #endif /* PGBENCH_USE_SELECT */ + /* * Support for duration option: set timer_exceeded after so many seconds. */ diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 4094e22..5d40796 100644 *** a/src/include/pg_config.h.in --- b/src/include/pg_config.h.in *************** *** 443,448 **** --- 443,451 ---- /* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */ #undef HAVE_PPC_LWARX_MUTEX_HINT + /* Define to 1 if you have the `ppoll' function. */ + #undef HAVE_PPOLL + /* Define to 1 if you have the `pstat' function. */ #undef HAVE_PSTAT diff --git a/src/include/pg_config.h.win32 b/src/include/pg_config.h.win32 index 6618b43..182698a 100644 *** a/src/include/pg_config.h.win32 --- b/src/include/pg_config.h.win32 *************** *** 327,332 **** --- 327,335 ---- /* Define to 1 if you have the `posix_fallocate' function. */ /* #undef HAVE_POSIX_FALLOCATE */ + /* Define to 1 if you have the `ppoll' function. */ + /* #undef HAVE_PPOLL */ + /* Define to 1 if you have the `pstat' function. */ /* #undef HAVE_PSTAT */ diff --git a/src/template/linux b/src/template/linux index f820bf7..e392908 100644 *** a/src/template/linux --- b/src/template/linux *************** if test x"$PREFERRED_SEMAPHORES" = x"" ; *** 6,11 **** --- 6,12 ---- fi # Force _GNU_SOURCE on; plperl is broken with Perl 5.8.0 otherwise + # This is also required for ppoll(2), and perhaps other things CPPFLAGS="$CPPFLAGS -D_GNU_SOURCE" # If --enable-profiling is specified, we need -DLINUX_PROFILE
pgsql-hackers by date: