Re: Unexpected interval comparison - Mailing list pgsql-general

From Tom Lane
Subject Re: Unexpected interval comparison
Date
Msg-id 27982.1491421870@sss.pgh.pa.us
Whole thread Raw
In response to Re: Unexpected interval comparison  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Responses Re: Unexpected interval comparison  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-general
Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes:
> The attached patch is the revised version.

Hmm, this still isn't right --- testing shows that you had the comparison
rule right the first time.

Looking at what we've got here, it's already a substantial fraction of
what would be needed to provide a compiler-independent implementation
of the int128-based aggregate logic in numeric.c.  With that in mind,
I propose that we extract the relevant stuff into a new header file
that is designed as general-purpose int128 support.  Something like the
attached.  I also attach the test program I put together to verify it.

On my Fedora 25 laptop, it appears that the hand-rolled implementation
is actually respectably fast compared to gcc's "native" functionality;
the test program runs in ~2m for 1B iterations with the native logic,
and ~2.5m with the hand-rolled logic.  Allowing for overhead and the
fact that we're doing the arithmetic twice, we're probably within 2X
of the native code.  Not bad at all.

I'm not entirely sure what to do with the test program:
1. discard it
2. commit it as utils/adt/int128.c, as suggested in its comment
3. commit it somewhere else, maybe src/tools/.

Thoughts?

            regards, tom lane

/*-------------------------------------------------------------------------
 *
 * int128.h
 *      Roll-our-own 128-bit integer arithmetic.
 *
 * We make use of the native int128 type if there is one, otherwise
 * implement things the hard way based on two int64 halves.
 *
 * Copyright (c) 2017, PostgreSQL Global Development Group
 *
 * src/include/utils/int128.h
 *
 *-------------------------------------------------------------------------
 */
#ifndef INT128_H
#define INT128_H

/*
 * For testing purposes, use of native int128 can be switched on/off by
 * predefining USE_NATIVE_INT128.
 */
#ifndef USE_NATIVE_INT128
#ifdef HAVE_INT128
#define USE_NATIVE_INT128 1
#else
#define USE_NATIVE_INT128 0
#endif
#endif


#if USE_NATIVE_INT128

typedef int128 INT128;

/*
 * Add an unsigned int64 value into an INT128 variable.
 */
static inline void
int128_add_uint64(INT128 *i128, uint64 v)
{
    *i128 += v;
}

/*
 * Add a signed int64 value into an INT128 variable.
 */
static inline void
int128_add_int64(INT128 *i128, int64 v)
{
    *i128 += v;
}

/*
 * Add the 128-bit product of two int64 values into an INT128 variable.
 *
 * XXX with a stupid compiler, this could actually be less efficient than
 * the other implementation; maybe we should do it by hand always?
 */
static inline void
int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y)
{
    *i128 += (int128) x *(int128) y;
}

/*
 * Compare two INT128 values, return -1, 0, or +1.
 */
static inline int
int128_compare(INT128 x, INT128 y)
{
    if (x < y)
        return -1;
    if (x > y)
        return 1;
    return 0;
}

#else                            /* !USE_NATIVE_INT128 */

/*
 * We lay out the INT128 structure with the same content and byte ordering
 * that a native int128 type would (probably) have.  This makes no difference
 * for ordinary use of INT128, but allows union'ing INT128 with int128 for
 * testing purposes.
 */
typedef struct
{
#ifdef WORDS_BIGENDIAN
    int64        hi;                /* most significant 64 bits, including sign */
    uint64        lo;                /* least significant 64 bits, without sign */
#else
    uint64        lo;                /* least significant 64 bits, without sign */
    int64        hi;                /* most significant 64 bits, including sign */
#endif
} INT128;

/*
 * Add an unsigned int64 value into an INT128 variable.
 */
static inline void
int128_add_uint64(INT128 *i128, uint64 v)
{
    /*
     * First add the value to the .lo part, then check to see if a carry needs
     * to be propagated into the .hi part.  A carry is needed if both inputs
     * have high bits set, or if just one input has high bit set while the new
     * .lo part doesn't.  Remember that .lo part is unsigned; we cast to
     * signed here just as a cheap way to check the high bit.
     */
    uint64        oldlo = i128->lo;

    i128->lo += v;
    if (((int64) v < 0 && (int64) oldlo < 0) ||
        (((int64) v < 0 || (int64) oldlo < 0) && (int64) i128->lo >= 0))
        i128->hi++;
}

/*
 * Add a signed int64 value into an INT128 variable.
 */
static inline void
int128_add_int64(INT128 *i128, int64 v)
{
    /*
     * This is much like the above except that the carry logic differs for
     * negative v.  Ordinarily we'd need to subtract 1 from the .hi part
     * (corresponding to adding the sign-extended bits of v to it); but if
     * there is a carry out of the .lo part, that cancels and we do nothing.
     */
    uint64        oldlo = i128->lo;

    i128->lo += v;
    if (v >= 0)
    {
        if ((int64) oldlo < 0 && (int64) i128->lo >= 0)
            i128->hi++;
    }
    else
    {
        if (!((int64) oldlo < 0 || (int64) i128->lo >= 0))
            i128->hi--;
    }
}

/*
 * INT64_AU32 extracts the most significant 32 bits of int64 as int64, while
 * INT64_AL32 extracts the least significant 32 bits as uint64.
 */
#define INT64_AU32(i64) ((i64) >> 32)
#define INT64_AL32(i64) ((i64) & UINT64CONST(0xFFFFFFFF))

/*
 * Add the 128-bit product of two int64 values into an INT128 variable.
 */
static inline void
int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y)
{
    /* INT64_AU32 must use arithmetic right shift */
    StaticAssertStmt(((int64) -1 >> 1) == (int64) -1,
                     "arithmetic right shift is needed");

    /*----------
     * Form the 128-bit product x * y using 64-bit arithmetic.
     * Considering each 64-bit input as having 32-bit high and low parts,
     * we can compute
     *
     *     x * y = ((x.hi << 32) + x.lo) * (((y.hi << 32) + y.lo)
     *           = (x.hi * y.hi) << 64 +
     *             (x.hi * y.lo) << 32 +
     *             (x.lo * y.hi) << 32 +
     *             x.lo * y.lo
     *
     * Each individual product is of 32-bit terms so it won't overflow when
     * computed in 64-bit arithmetic.  Then we just have to shift it to the
     * correct position while adding into the 128-bit result.  We must also
     * keep in mind that the "lo" parts must be treated as unsigned.
     *----------
     */

    /* No need to work hard if product must be zero */
    if (x != 0 && y != 0)
    {
        int64        x_u32 = INT64_AU32(x);
        uint64        x_l32 = INT64_AL32(x);
        int64        y_u32 = INT64_AU32(y);
        uint64        y_l32 = INT64_AL32(y);
        int64        tmp;

        /* the first term */
        i128->hi += x_u32 * y_u32;

        /* the second term: sign-extend it only if x is negative */
        tmp = x_u32 * y_l32;
        if (x < 0)
            i128->hi += INT64_AU32(tmp);
        else
            i128->hi += ((uint64) tmp) >> 32;
        int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32);

        /* the third term: sign-extend it only if y is negative */
        tmp = x_l32 * y_u32;
        if (y < 0)
            i128->hi += INT64_AU32(tmp);
        else
            i128->hi += ((uint64) tmp) >> 32;
        int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32);

        /* the fourth term: always unsigned */
        int128_add_uint64(i128, x_l32 * y_l32);
    }
}

/*
 * Compare two INT128 values, return -1, 0, or +1.
 */
static inline int
int128_compare(INT128 x, INT128 y)
{
    if (x.hi < y.hi)
        return -1;
    if (x.hi > y.hi)
        return 1;
    if (x.lo < y.lo)
        return -1;
    if (x.lo > y.lo)
        return 1;
    return 0;
}

#endif   /* USE_NATIVE_INT128 */

#endif   /* INT128_H */
/*-------------------------------------------------------------------------
 *
 * int128.c
 *      Testbed for roll-our-own 128-bit integer arithmetic.
 *
 * This file is not meant to be compiled into the backend; rather, it builds
 * a standalone test program that compares the behavior of an implementation
 * in int128.h to an (assumed correct) int128 native type.
 *
 * Copyright (c) 2017, PostgreSQL Global Development Group
 *
 *
 * IDENTIFICATION
 *      src/backend/utils/adt/int128.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

/*
 * By default, we test the non-native implementation in int128.h; but
 * by predefining USE_NATIVE_INT128 to 1, you can test the native
 * implementation, just to be sure.
 */
#ifndef USE_NATIVE_INT128
#define USE_NATIVE_INT128 0
#endif

#include "int128.h"

/*
 * We assume the parts of this union are laid out compatibly.
 */
typedef union
{
    int128        i128;
    INT128        I128;
    union
    {
#ifdef WORDS_BIGENDIAN
        int64        hi;
        uint64        lo;
#else
        uint64        lo;
        int64        hi;
#endif
    }            hl;
} test128;


/*
 * Control version of comparator.
 */
static inline int
my_int128_compare(int128 x, int128 y)
{
    if (x < y)
        return -1;
    if (x > y)
        return 1;
    return 0;
}

/*
 * Get a random uint64 value.
 * We don't assume random() is good for more than 16 bits.
 */
static uint64
get_random_uint64(void)
{
    uint64        x;

    x = (uint64) (random() & 0xFFFF) << 48;
    x |= (uint64) (random() & 0xFFFF) << 32;
    x |= (uint64) (random() & 0xFFFF) << 16;
    x |= (uint64) (random() & 0xFFFF);
    return x;
}

/*
 * Main program.
 *
 * You can give it a loop count if you don't like the default 1B iterations.
 */
int
main(int argc, char **argv)
{
    long        count;

    if (argc >= 2)
        count = strtol(argv[1], NULL, 0);
    else
        count = 1000000000;

    while (count-- > 0)
    {
        int64        x = get_random_uint64();
        int64        y = get_random_uint64();
        int64        z = get_random_uint64();
        test128        t1;
        test128        t2;

        /* check unsigned addition */
        t1.hl.hi = x;
        t1.hl.lo = y;
        t2 = t1;
        t1.i128 += (int128) (uint64) z;
        int128_add_uint64(&t2.I128, (uint64) z);

        if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo)
        {
            printf("%016lX%016lX + unsigned %lX\n", x, y, z);
            printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo);
            printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo);
            return 1;
        }

        /* check signed addition */
        t1.hl.hi = x;
        t1.hl.lo = y;
        t2 = t1;
        t1.i128 += (int128) z;
        int128_add_int64(&t2.I128, z);

        if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo)
        {
            printf("%016lX%016lX + signed %lX\n", x, y, z);
            printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo);
            printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo);
            return 1;
        }

        /* check multiplication */
        t1.i128 = (int128) x *(int128) y;

        t2.hl.hi = t2.hl.lo = 0;
        int128_add_int64_mul_int64(&t2.I128, x, y);

        if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo)
        {
            printf("%lX * %lX\n", x, y);
            printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo);
            printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo);
            return 1;
        }

        /* check comparison */
        t1.hl.hi = x;
        t1.hl.lo = y;
        t2.hl.hi = z;
        t2.hl.lo = get_random_uint64();

        if (my_int128_compare(t1.i128, t2.i128) !=
            int128_compare(t1.I128, t2.I128))
        {
            printf("comparison failure: %d vs %d\n",
                   my_int128_compare(t1.i128, t2.i128),
                   int128_compare(t1.I128, t2.I128));
            printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo);
            printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo);
            return 1;
        }

        /* check case with identical hi parts; above will hardly ever hit it */
        t2.hl.hi = x;

        if (my_int128_compare(t1.i128, t2.i128) !=
            int128_compare(t1.I128, t2.I128))
        {
            printf("comparison failure: %d vs %d\n",
                   my_int128_compare(t1.i128, t2.i128),
                   int128_compare(t1.I128, t2.I128));
            printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo);
            printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo);
            return 1;
        }
    }

    return 0;
}

pgsql-general by date:

Previous
From: "Joshua D. Drake"
Date:
Subject: Re: browser interface to forums please?
Next
From: Tom Lane
Date:
Subject: Re: Is there a point to having both a normal gist index and an exclude index?