\crosstabview fixes - Mailing list pgsql-hackers

From Tom Lane
Subject \crosstabview fixes
Date
Msg-id 10276.1460569515@sss.pgh.pa.us
Whole thread Raw
Responses Re: \crosstabview fixes  ("Daniel Verite" <daniel@manitou-mail.org>)
Re: \crosstabview fixes  (Alvaro Herrera <alvherre@2ndquadrant.com>)
List pgsql-hackers
I noticed that the \crosstabview documentation asserts that column name
arguments are handled per standard SQL semantics.  In point of fact,
though, the patch expends a couple hundred lines to implement what is
NOT standard SQL semantics: matching unquoted names case-insensitively
is anything but that.  I think we should rip all that out and do it as
per attached.  (I also took the trouble to make the error messages conform
to project style.)

Comments/objections?

            regards, tom lane

diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index 227d180..b88f706 100644
*** a/src/bin/psql/command.c
--- b/src/bin/psql/command.c
*************** exec_command(const char *cmd,
*** 369,379 ****
      else if (strcmp(cmd, "crosstabview") == 0)
      {
          pset.ctv_col_V = psql_scan_slash_option(scan_state,
!                                                 OT_NORMAL, NULL, false);
          pset.ctv_col_H = psql_scan_slash_option(scan_state,
!                                                 OT_NORMAL, NULL, false);
          pset.ctv_col_D = psql_scan_slash_option(scan_state,
!                                                 OT_NORMAL, NULL, false);

          pset.crosstab_flag = true;
          status = PSQL_CMD_SEND;
--- 369,379 ----
      else if (strcmp(cmd, "crosstabview") == 0)
      {
          pset.ctv_col_V = psql_scan_slash_option(scan_state,
!                                                 OT_SQLID, NULL, true);
          pset.ctv_col_H = psql_scan_slash_option(scan_state,
!                                                 OT_SQLID, NULL, true);
          pset.ctv_col_D = psql_scan_slash_option(scan_state,
!                                                 OT_SQLID, NULL, true);

          pset.crosstab_flag = true;
          status = PSQL_CMD_SEND;
diff --git a/src/bin/psql/crosstabview.c b/src/bin/psql/crosstabview.c
index 3cc15ed..7889f63 100644
*** a/src/bin/psql/crosstabview.c
--- b/src/bin/psql/crosstabview.c
*************** static bool printCrosstab(const PGresult
*** 82,97 ****
                int num_columns, pivot_field *piv_columns, int field_for_columns,
                int num_rows, pivot_field *piv_rows, int field_for_rows,
                int field_for_data);
- static int parseColumnRefs(const char *arg, const PGresult *res,
-                 int **col_numbers,
-                 int max_columns, char separator);
  static void avlInit(avl_tree *tree);
  static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
  static int avlCollectFields(avl_tree *tree, avl_node *node,
                   pivot_field *fields, int idx);
  static void avlFree(avl_tree *tree, avl_node *node);
  static void rankSort(int num_columns, pivot_field *piv_columns);
! static int    indexOfColumn(const char *arg, const PGresult *res);
  static int    pivotFieldCompare(const void *a, const void *b);
  static int    rankCompare(const void *a, const void *b);

--- 82,95 ----
                int num_columns, pivot_field *piv_columns, int field_for_columns,
                int num_rows, pivot_field *piv_rows, int field_for_rows,
                int field_for_data);
  static void avlInit(avl_tree *tree);
  static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
  static int avlCollectFields(avl_tree *tree, avl_node *node,
                   pivot_field *fields, int idx);
  static void avlFree(avl_tree *tree, avl_node *node);
  static void rankSort(int num_columns, pivot_field *piv_columns);
! static int indexOfColumn(const char *arg, const PGresult *res,
!               bool printError);
  static int    pivotFieldCompare(const void *a, const void *b);
  static int    rankCompare(const void *a, const void *b);

*************** PrintResultsInCrosstab(const PGresult *r
*** 116,125 ****
      pivot_field *array_rows = NULL;
      int            num_columns = 0;
      int            num_rows = 0;
-     int           *colsV = NULL,
-                *colsH = NULL,
-                *colsD = NULL;
-     int            n;
      int            field_for_columns;
      int            sort_field_for_columns = -1;
      int            field_for_rows;
--- 114,119 ----
*************** PrintResultsInCrosstab(const PGresult *r
*** 129,155 ****
      avlInit(&piv_rows);
      avlInit(&piv_columns);

-     if (res == NULL)
-     {
-         psql_error(_("No result\n"));
-         goto error_return;
-     }
-
      if (PQresultStatus(res) != PGRES_TUPLES_OK)
      {
!         psql_error(_("The query must return results to be shown in crosstab\n"));
!         goto error_return;
!     }
!
!     if (opt_field_for_rows && !opt_field_for_columns)
!     {
!         psql_error(_("A second column must be specified for the horizontal header\n"));
          goto error_return;
      }

!     if (PQnfields(res) <= 2)
      {
!         psql_error(_("The query must return at least two columns to be shown in crosstab\n"));
          goto error_return;
      }

--- 123,137 ----
      avlInit(&piv_rows);
      avlInit(&piv_columns);

      if (PQresultStatus(res) != PGRES_TUPLES_OK)
      {
!         psql_error(_("query must return results to be shown in crosstab\n"));
          goto error_return;
      }

!     if (PQnfields(res) < 3)
      {
!         psql_error(_("query must return at least three columns to be shown in crosstab\n"));
          goto error_return;
      }

*************** PrintResultsInCrosstab(const PGresult *r
*** 158,209 ****
       * left-most column. Only a reference to a field is accepted (no sort
       * column).
       */
-
      if (opt_field_for_rows == NULL)
      {
          field_for_rows = 0;
      }
      else
      {
!         n = parseColumnRefs(opt_field_for_rows, res, &colsV, 1, ':');
!         if (n != 1)
              goto error_return;
-         field_for_rows = colsV[0];
      }

-     if (field_for_rows < 0)
-         goto error_return;
-
      /*----------
       * Arguments processing for the horizontal header (2nd arg)
       * (pivoted column that gets displayed as the first row).
       * Determine:
       * - the field number for the horizontal header column
       * - the field number of the associated sort column, if any
       */
-
      if (opt_field_for_columns == NULL)
          field_for_columns = 1;
      else
      {
!         n = parseColumnRefs(opt_field_for_columns, res, &colsH, 2, ':');
!         if (n <= 0)
!             goto error_return;
!         if (n == 1)
!             field_for_columns = colsH[0];
          else
          {
!             field_for_columns = colsH[0];
!             sort_field_for_columns = colsH[1];
          }
-
-         if (field_for_columns < 0)
-             goto error_return;
      }

      if (field_for_columns == field_for_rows)
      {
!         psql_error(_("The same column cannot be used for both vertical and horizontal headers\n"));
          goto error_return;
      }

--- 140,200 ----
       * left-most column. Only a reference to a field is accepted (no sort
       * column).
       */
      if (opt_field_for_rows == NULL)
      {
          field_for_rows = 0;
      }
      else
      {
!         field_for_rows = indexOfColumn(opt_field_for_rows, res, true);
!         if (field_for_rows < 0)
              goto error_return;
      }

      /*----------
       * Arguments processing for the horizontal header (2nd arg)
       * (pivoted column that gets displayed as the first row).
       * Determine:
       * - the field number for the horizontal header column
       * - the field number of the associated sort column, if any
+      *----------
       */
      if (opt_field_for_columns == NULL)
          field_for_columns = 1;
      else
      {
!         char       *colonpos = strchr(opt_field_for_columns, ':');
!
!         if (colonpos == NULL)
!         {
!             /* No colon, so must be a simple column ID */
!             field_for_columns = indexOfColumn(opt_field_for_columns, res, true);
!             if (field_for_columns < 0)
!                 goto error_return;
!         }
          else
          {
!             /* Try it as simple column ID, but don't demand a match */
!             field_for_columns = indexOfColumn(opt_field_for_columns, res, false);
!             if (field_for_columns < 0)
!             {
!                 /* Split at the colon, and demand matches */
!                 /* It's okay to trash pset.ctv_col_H here */
!                 *colonpos++ = '\0';
!                 field_for_columns = indexOfColumn(opt_field_for_columns, res, true);
!                 if (field_for_columns < 0)
!                     goto error_return;
!                 sort_field_for_columns = indexOfColumn(colonpos, res, true);
!                 if (sort_field_for_columns < 0)
!                     goto error_return;
!             }
          }
      }

+     /* Insist that header columns be distinct */
      if (field_for_columns == field_for_rows)
      {
!         psql_error(_("vertical and horizontal headers must be different columns\n"));
          goto error_return;
      }

*************** PrintResultsInCrosstab(const PGresult *r
*** 217,228 ****

          /*
           * If the data column was not specified, we search for the one not
!          * used as either vertical or horizontal headers.  If the result has
!          * more than three columns, raise an error.
           */
!         if (PQnfields(res) > 3)
          {
!             psql_error(_("Data column must be specified when the result set has more than three columns\n"));
              goto error_return;
          }

--- 208,219 ----

          /*
           * If the data column was not specified, we search for the one not
!          * used as either vertical or horizontal headers.  Must be exactly
!          * three columns, or this won't be unique.
           */
!         if (PQnfields(res) != 3)
          {
!             psql_error(_("data column must be specified when result set has more than three columns\n"));
              goto error_return;
          }

*************** PrintResultsInCrosstab(const PGresult *r
*** 238,250 ****
      }
      else
      {
!         int        num_fields;
!
!         /* If a field was given, find out what it is.  Only one is allowed. */
!         num_fields = parseColumnRefs(opt_field_for_data, res, &colsD, 1, ',');
!         if (num_fields < 1)
              goto error_return;
-         field_for_data = colsD[0];
      }

      /*
--- 229,237 ----
      }
      else
      {
!         field_for_data = indexOfColumn(opt_field_for_data, res, true);
!         if (field_for_data < 0)
              goto error_return;
      }

      /*
*************** PrintResultsInCrosstab(const PGresult *r
*** 271,277 ****

          if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS)
          {
!             psql_error(_("Maximum number of columns (%d) exceeded\n"),
                         CROSSTABVIEW_MAX_COLUMNS);
              goto error_return;
          }
--- 258,264 ----

          if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS)
          {
!             psql_error(_("maximum number of crosstab columns (%d) exceeded\n"),
                         CROSSTABVIEW_MAX_COLUMNS);
              goto error_return;
          }
*************** error_return:
*** 319,327 ****
      avlFree(&piv_rows, piv_rows.root);
      pg_free(array_columns);
      pg_free(array_rows);
-     pg_free(colsV);
-     pg_free(colsH);
-     pg_free(colsD);

      return retval;
  }
--- 306,311 ----
*************** printCrosstab(const PGresult *results,
*** 442,448 ****
               */
              if (cont.cells[idx] != NULL)
              {
!                 psql_error(_("data cell already contains a value: (row: \"%s\", column: \"%s\")\n"),
                               piv_rows[row_number].name ? piv_rows[row_number].name :
                               popt.nullPrint ? popt.nullPrint : "(null)",
                               piv_columns[col_number].name ? piv_columns[col_number].name :
--- 426,432 ----
               */
              if (cont.cells[idx] != NULL)
              {
!                 psql_error(_("query contains multiple data values for row \"%s\", column \"%s\"\n"),
                               piv_rows[row_number].name ? piv_rows[row_number].name :
                               popt.nullPrint ? popt.nullPrint : "(null)",
                               piv_columns[col_number].name ? piv_columns[col_number].name :
*************** error:
*** 476,583 ****
  }

  /*
-  * Parse "arg", which is a string of column IDs separated by "separator".
-  *
-  * Each column ID can be:
-  * - a number from 1 to PQnfields(res)
-  * - an unquoted column name matching (case insensitively) one of PQfname(res,...)
-  * - a quoted column name matching (case sensitively) one of PQfname(res,...)
-  *
-  * If max_columns > 0, it is the max number of column IDs allowed.
-  *
-  * On success, return number of column IDs found (possibly 0), and return a
-  * malloc'd array of the matching column numbers of "res" into *col_numbers.
-  *
-  * On failure, return -1 and set *col_numbers to NULL.
-  */
- static int
- parseColumnRefs(const char *arg,
-                 const PGresult *res,
-                 int **col_numbers,
-                 int max_columns,
-                 char separator)
- {
-     const char *p = arg;
-     char        c;
-     int            num_cols = 0;
-
-     *col_numbers = NULL;
-     while ((c = *p) != '\0')
-     {
-         const char *field_start = p;
-         bool        quoted_field = false;
-
-         /* first char */
-         if (c == '"')
-         {
-             quoted_field = true;
-             p++;
-         }
-
-         while ((c = *p) != '\0')
-         {
-             if (c == separator && !quoted_field)
-                 break;
-             if (c == '"')        /* end of field or embedded double quote */
-             {
-                 p++;
-                 if (*p == '"')
-                 {
-                     if (quoted_field)
-                     {
-                         p++;
-                         continue;
-                     }
-                 }
-                 else if (quoted_field && *p == separator)
-                     break;
-             }
-             if (*p)
-                 p += PQmblen(p, pset.encoding);
-         }
-
-         if (p != field_start)
-         {
-             char   *col_name;
-             int        col_num;
-
-             /* enforce max_columns limit */
-             if (max_columns > 0 && num_cols == max_columns)
-             {
-                 psql_error(_("No more than %d column references expected\n"),
-                            max_columns);
-                 goto errfail;
-             }
-             /* look up the column and add its index into *col_numbers */
-             col_name = pg_malloc(p - field_start + 1);
-             memcpy(col_name, field_start, p - field_start);
-             col_name[p - field_start] = '\0';
-             col_num = indexOfColumn(col_name, res);
-             pg_free(col_name);
-             if (col_num < 0)
-                 goto errfail;
-             *col_numbers = (int *) pg_realloc(*col_numbers,
-                                               (num_cols + 1) * sizeof(int));
-             (*col_numbers)[num_cols++] = col_num;
-         }
-         else
-         {
-             psql_error(_("Empty column reference\n"));
-             goto errfail;
-         }
-
-         if (*p)
-             p += PQmblen(p, pset.encoding);
-     }
-     return num_cols;
-
- errfail:
-     pg_free(*col_numbers);
-     *col_numbers = NULL;
-     return -1;
- }
-
- /*
   * The avl* functions below provide a minimalistic implementation of AVL binary
   * trees, to efficiently collect the distinct values that will form the horizontal
   * and vertical headers. It only supports adding new values, no removal or even
--- 460,465 ----
*************** rankSort(int num_columns, pivot_field *p
*** 773,833 ****
  }

  /*
!  * Compare a user-supplied argument against a field name obtained by PQfname(),
!  * which is already case-folded.
!  * If arg is not enclosed in double quotes, pg_strcasecmp applies, otherwise
!  * do a case-sensitive comparison with these rules:
!  * - double quotes enclosing 'arg' are filtered out
!  * - double quotes inside 'arg' are expected to be doubled
!  */
! static bool
! fieldNameEquals(const char *arg, const char *fieldname)
! {
!     const char *p = arg;
!     const char *f = fieldname;
!     char        c;
!
!     if (*p++ != '"')
!         return (pg_strcasecmp(arg, fieldname) == 0);
!
!     while ((c = *p++))
!     {
!         if (c == '"')
!         {
!             if (*p == '"')
!                 p++;            /* skip second quote and continue */
!             else if (*p == '\0')
!                 return (*f == '\0');    /* p is shorter than f, or is
!                                          * identical */
!         }
!         if (*f == '\0')
!             return false;        /* f is shorter than p */
!         if (c != *f)            /* found one byte that differs */
!             return false;
!         f++;
!     }
!     return (*f == '\0');
! }
!
! /*
!  * arg can be a number or a column name, possibly quoted (like in an ORDER BY clause)
!  * Returns:
!  *    on success, the 0-based index of the column
!  *    or -1 if the column number or name is not found in the result's structure,
!  *          or if it's ambiguous (arg corresponding to several columns)
   */
  static int
! indexOfColumn(const char *arg, const PGresult *res)
  {
      int            idx;

!     if (strspn(arg, "0123456789") == strlen(arg))
      {
          /* if arg contains only digits, it's a column number */
          idx = atoi(arg) - 1;
          if (idx < 0 || idx >= PQnfields(res))
          {
!             psql_error(_("Invalid column number: %s\n"), arg);
              return -1;
          }
      }
--- 655,680 ----
  }

  /*
!  * Look up a column reference, which can be either:
!  * - a number from 1 to PQnfields(res)
!  * - a column name matching one of PQfname(res,...)
!  *
!  * Returns zero-based column number, or -1 if not found or ambiguous.
!  * On failure return, a suitable error is printed if printError is true.
   */
  static int
! indexOfColumn(const char *arg, const PGresult *res, bool printError)
  {
      int            idx;

!     if (arg[0] && strspn(arg, "0123456789") == strlen(arg))
      {
          /* if arg contains only digits, it's a column number */
          idx = atoi(arg) - 1;
          if (idx < 0 || idx >= PQnfields(res))
          {
!             if (printError)
!                 psql_error(_("invalid column number: \"%s\"\n"), arg);
              return -1;
          }
      }
*************** indexOfColumn(const char *arg, const PGr
*** 838,849 ****
          idx = -1;
          for (i = 0; i < PQnfields(res); i++)
          {
!             if (fieldNameEquals(arg, PQfname(res, i)))
              {
                  if (idx >= 0)
                  {
!                     /* if another idx was already found for the same name */
!                     psql_error(_("Ambiguous column name: %s\n"), arg);
                      return -1;
                  }
                  idx = i;
--- 685,697 ----
          idx = -1;
          for (i = 0; i < PQnfields(res); i++)
          {
!             if (strcmp(arg, PQfname(res, i)) == 0)
              {
                  if (idx >= 0)
                  {
!                     /* another idx was already found for the same name */
!                     if (printError)
!                         psql_error(_("ambiguous column name: \"%s\"\n"), arg);
                      return -1;
                  }
                  idx = i;
*************** indexOfColumn(const char *arg, const PGr
*** 851,857 ****
          }
          if (idx == -1)
          {
!             psql_error(_("Invalid column name: %s\n"), arg);
              return -1;
          }
      }
--- 699,706 ----
          }
          if (idx == -1)
          {
!             if (printError)
!                 psql_error(_("column name not found: \"%s\"\n"), arg);
              return -1;
          }
      }
diff --git a/src/test/regress/expected/psql_crosstab.out b/src/test/regress/expected/psql_crosstab.out
index c87c2fc..6e7fd08 100644
*** a/src/test/regress/expected/psql_crosstab.out
--- b/src/test/regress/expected/psql_crosstab.out
*************** FROM ctv_data GROUP BY v, h ORDER BY 1,3
*** 112,118 ****

  -- only null, no column name, 2 columns: error
  SELECT null,null \crosstabview
! The query must return at least two columns to be shown in crosstab
  -- only null, no column name, 3 columns: works
  SELECT null,null,null \crosstabview
   ?column? |
--- 112,118 ----

  -- only null, no column name, 2 columns: error
  SELECT null,null \crosstabview
! query must return at least three columns to be shown in crosstab
  -- only null, no column name, 3 columns: works
  SELECT null,null,null \crosstabview
   ?column? |
*************** FROM ctv_data GROUP BY v, h ORDER BY h,v
*** 166,185 ****
  -- error: bad column name
  SELECT v,h,c,i FROM ctv_data
   \crosstabview v h j
! Invalid column name: j
  -- error: bad column number
  SELECT v,h,i,c FROM ctv_data
   \crosstabview 2 1 5
! Invalid column number: 5
  -- error: same H and V columns
  SELECT v,h,i,c FROM ctv_data
   \crosstabview 2 h 4
! The same column cannot be used for both vertical and horizontal headers
  -- error: too many columns
  SELECT a,a,1 FROM generate_series(1,3000) AS a
   \crosstabview
! Maximum number of columns (1600) exceeded
  -- error: only one column
  SELECT 1 \crosstabview
! The query must return at least two columns to be shown in crosstab
  DROP TABLE ctv_data;
--- 166,185 ----
  -- error: bad column name
  SELECT v,h,c,i FROM ctv_data
   \crosstabview v h j
! column name not found: "j"
  -- error: bad column number
  SELECT v,h,i,c FROM ctv_data
   \crosstabview 2 1 5
! invalid column number: "5"
  -- error: same H and V columns
  SELECT v,h,i,c FROM ctv_data
   \crosstabview 2 h 4
! vertical and horizontal headers must be different columns
  -- error: too many columns
  SELECT a,a,1 FROM generate_series(1,3000) AS a
   \crosstabview
! maximum number of crosstab columns (1600) exceeded
  -- error: only one column
  SELECT 1 \crosstabview
! query must return at least three columns to be shown in crosstab
  DROP TABLE ctv_data;

pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: Re: Odd system-column handling in postgres_fdw join pushdown patch
Next
From: Tom Lane
Date:
Subject: Re: SET ROLE and reserved roles