libpq PQexec not thread-safe - Mailing list pgsql-bugs

From Chris Brown
Subject libpq PQexec not thread-safe
Date
Msg-id 3E77E2B6.2030406@chibi.ca
Whole thread Raw
List pgsql-bugs
Author note:
I am not in any of the pg mailing lists.  Please contact me directly for
further information, or invite me to an appropriate mailing list.

Summary:
libpq PQexec is not thread safe with respec to the async signal handler
SIGPIPE.  This can result in problems ranging from an app signal handler
being reset to the app exitting inside libpq as a result of a default
signal handler.  (The issue is a race condition, to results are varied.)

Environment:
 - RedHat Linux using pthreads (redhat 6.2 and 8), SMP 4-way, intel
 - libpq 7.3.2

Steps:
 - The application sets sigaction SIGPIPE to something other than SIG_IGN
   or leaves SIG_DFL
 - Open two threads
 - Each thread creates its own connection
 - Each thread performs independant access to the database in an
   unsynchronized way
 - Join the two threads
 - Check the final signal handler

Actual Results:
 - The SIGPIPE handler changes when two PQexecs occur 'at the same time'
 - The SIGPIPE handler will tend towards SIG_IGN
 - Sometimes a PQexec will be operating with SIG_DFL as the SIGPIPE handler
   (if the app leaves the default SIG_DFL SIGPIPE handler)
 - The final signal handler is SIG_IGN most of the time

Expected Results:
 - While at least one PQexec is running, the SIGPIPE handler will be
   SIG_IGN even though the application has a different SIGPIPE handler
 - While no PQexecs are running, the SIGPIPE handler will be whatever
   the app set it to
 - PQexec will never be left with a SIG_IGN SIGPIPE handler
 - The final signal handler is whatever the app set it to initially

Additional notes:
 - While sigaction provides flags, they are not being stored or restored

Suggestions:
 - As a hack, make all applications that use libpq first set SIGPIPE's
   sigaction to SIG_IGN (this clears up all problems)
 - Use an internal mutex to prevent sigaction from being called from
   two different threads between set and reset calls (an internal counter)
 - Allow the application to tell libpq not to touch sigactions
 - Allow the appliaction to provide a signal handler callback
 - Restore the entire sigaction structure rather than just the sa_handler

Example of failure:
#include <libpq-fe.h>
#include <signal.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>



static int parse_args ( int argc, char * argv [] );
static int set_sigpipe ( void );
static int start_thread ( pthread_t * thread_id );
static int join_thread ( pthread_t thread_id );
static void pipe_sighandler ( int signum );
static void * pg_thread ( void * notused );
static void check_pipe_handler ( int count );

static int g_pipe_count;
static const char * g_db_name;
static const char * g_db_host;
static const char * g_db_port;
static const char * g_db_user;
static const char * g_db_pass;



int main ( int argc, char * argv [] )
{
        pthread_t thread1;
        pthread_t thread2;

        if ( parse_args( argc, argv ) < 0 )  return( 2 );
        if ( set_sigpipe()            < 0 )  return( 2 );
        if ( start_thread( &thread1 ) < 0 )  return( 2 );
        if ( start_thread( &thread2 ) < 0 )  return( 2 );

        if ( join_thread( thread1 ) < 0 )  return( 2 );
        if ( join_thread( thread2 ) < 0 )  return( 2 );

        check_pipe_handler( -1 );

        return( 0 );
}



static int parse_args ( int argc, char * argv [] )
{
        extern char * optarg;
        extern int    optind;
        extern int    opterr;
        extern int    optopt;
        int res;

        opterr = 0;

        while ( (res = getopt( argc, argv, "d:h:p:U:W:" )) != -1 ) {
                switch ( res ) {
                case 'd':
                        g_db_name = strdup( optarg );
                        break;
                case 'h':
                        g_db_host = strdup( optarg );
                        break;
                case 'p':
                        g_db_port = strdup( optarg );
                        break;
                case 'U':
                        g_db_user = strdup( optarg );
                        break;
                case 'W':
                        g_db_pass = strdup( optarg );
                        break;
                default:
                        fprintf( stderr, "%s [-d <db>] [-h <host>] [-p
<port>]"
                                 "[-U <user>] [-W <pass>]\n", argv[ 0 ] );
                        return( -1 );
                }
        }

        return( 0 );
}



static int set_sigpipe ( void )
{
        struct sigaction new_act;
        struct sigaction old_act;

        g_pipe_count = 0;
        memset( &new_act, 0, sizeof( new_act ) );
        memset( &old_act, 0, sizeof( old_act ) );
        new_act.sa_handler = pipe_sighandler;

        if ( sigaction( SIGPIPE, &new_act, &old_act ) ) {
                fprintf( stderr, "sigaction( SIGPIPE, ... ):%s(%d)\n",
                         strerror( errno ), errno );
                return( -1 );
        }

        return( 0 );
}



static void pipe_sighandler ( int signum )
{
        g_pipe_count++;
}


static int start_thread ( pthread_t * thread_id )
{
        int result;
        *thread_id = 0;
        result = pthread_create( thread_id, NULL, pg_thread, NULL );
        if ( result ) {
                fprintf( stderr, "pthread_create(...):%s(%d)\n",
                         strerror( result ), result );
                return( -1 );
        }
        return( 0 );
}



static int join_thread ( pthread_t thread_id )
{
        int result;
        int status;
        result = pthread_join( thread_id, (void *)&status );
        if ( result ) {
                fprintf( stderr, "pthread_join(...):%s(%d)\n",
                         strerror( result ), result );
                return( -1 );
        }
        if ( status )  return( -1 );
        return( 0 );
}



static void * pg_thread ( void * notused )
{
        char             conninfo [ 512 ];
        PGconn         * conn;
        ConnStatusType   status;
        PGresult       * result;
        int              count;

        sprintf( conninfo, "dbname='%s' user='%s' password='%s' "
                 "host='%s' port='%s'",
                 g_db_name, g_db_user, g_db_pass, g_db_host, g_db_port );
        conn = PQconnectdb( conninfo );
        if ( !conn ) {
                fprintf( stderr, "PQconnectdb( %s ) = NULL\n", conninfo );
                return( (void *)-2 );
        }
        status = PQstatus( conn );
        if ( status != CONNECTION_OK ) {
                fprintf( stderr, "PQstatus( ... ) = %d\n%s\n", (int)status,
                         PQerrorMessage( conn ) );
                return( (void *)-2 );
        }

        for ( count = 0; count < 1000; count++ ) {
                check_pipe_handler( count );
                result = PQexec( conn, "SELECT proname FROM pg_proc" );
                if ( !result ) {
                        fprintf( stderr, "PQexec( ... ) = NULL\n" );
                        return( (void *)-2 );
                }
                PQclear( result );
        }

        PQfinish( conn );
        return( (void *)0 );
}



static void check_pipe_handler ( int count )
{
        static int err_cnt = 0;
        struct sigaction old_act;

        g_pipe_count = 0;
        memset( &old_act, 0, sizeof( old_act ) );

        if ( sigaction( SIGPIPE, NULL, &old_act ) ) {
                fprintf( stderr, "sigaction( SIGPIPE, ... ):%s(%d)\n",
                         strerror( errno ), errno );
        }

        if ( old_act.sa_handler != pipe_sighandler ) {
                err_cnt++;
                if ( err_cnt > 2  &&  count > 0 )  return;

                fprintf( stderr, "sa_handler = %p (count %4d)\n",
                         old_act.sa_handler, count );
        }
}

pgsql-bugs by date:

Previous
From: Ed Schaller
Date:
Subject: Re: Bug #890: only one user per process in libpq with krb5 auth
Next
From: pgsql-bugs@postgresql.org
Date:
Subject: Bug #914: Possible bug with regards to multiple persistant connections