[HACKERS] Assorted leaks of PGresults - Mailing list pgsql-hackers

From Tom Lane
Subject [HACKERS] Assorted leaks of PGresults
Date
Msg-id 22620.1497486981@sss.pgh.pa.us
Whole thread Raw
Responses Re: [HACKERS] Assorted leaks of PGresults
List pgsql-hackers
Coverity complained that libpqwalreceiver.c's libpqrcv_PQexec()
potentially leaks a PGresult.  It's right: if PQconsumeInput() fails after
we've already collected at least one PGresult, the function just returns
NULL without remembering to free last_result.  That's easy enough to fix,
just insert "PQclear(lastResult)" before the return.  I do not think this
is a significant leak, because the walreceiver process would just exit
anyway after the failure.  But we ought to fix it in case somebody copies
the logic into someplace less forgiving.

In fact ... I looked through other callers of PQconsumeInput() to see if
we had the same wrong coding pattern elsewhere, and we do, in two places
in postgres_fdw/connection.c.  One of those is new as of last week, in
commit ae9bfc5d6.  What's worse, in those cases we have code inside the
loop that might throw elog(ERROR), resulting in a permanently leaked
PGresult --- and since the backend process would keep going, this could
possibly be repeated and build up to a leak that amounted to something
significant.  We need to have PG_TRY blocks in both these places that
ensure that any held PGresult gets freed before we exit the functions.

Further review showed an additional leak introduced by ae9bfc5d6, to wit
that pgfdw_exec_cleanup_query() just forgets to clear the PGresult
returned by pgfdw_get_cleanup_result() in its normal non-error path.

In short, I think we need the attached patch in HEAD.  It needs to be
back-patched too, though the code seems to be a bit different in the
back branches.

            regards, tom lane

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 1b691fb..9818d27 100644
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
*************** pgfdw_exec_query(PGconn *conn, const cha
*** 485,491 ****
   *
   * This function offers quick responsiveness by checking for any interruptions.
   *
!  * This function emulates the PQexec()'s behavior of returning the last result
   * when there are many.
   *
   * Caller is responsible for the error handling on the result.
--- 485,491 ----
   *
   * This function offers quick responsiveness by checking for any interruptions.
   *
!  * This function emulates PQexec()'s behavior of returning the last result
   * when there are many.
   *
   * Caller is responsible for the error handling on the result.
*************** pgfdw_exec_query(PGconn *conn, const cha
*** 493,532 ****
  PGresult *
  pgfdw_get_result(PGconn *conn, const char *query)
  {
!     PGresult   *last_res = NULL;

!     for (;;)
      {
!         PGresult   *res;
!
!         while (PQisBusy(conn))
          {
!             int            wc;

!             /* Sleep until there's something to do */
!             wc = WaitLatchOrSocket(MyLatch,
!                                    WL_LATCH_SET | WL_SOCKET_READABLE,
!                                    PQsocket(conn),
!                                    -1L, PG_WAIT_EXTENSION);
!             ResetLatch(MyLatch);

!             CHECK_FOR_INTERRUPTS();

!             /* Data available in socket */
!             if (wc & WL_SOCKET_READABLE)
!             {
!                 if (!PQconsumeInput(conn))
!                     pgfdw_report_error(ERROR, NULL, conn, false, query);
              }
-         }

!         res = PQgetResult(conn);
!         if (res == NULL)
!             break;                /* query is complete */

          PQclear(last_res);
!         last_res = res;
      }

      return last_res;
  }
--- 493,542 ----
  PGresult *
  pgfdw_get_result(PGconn *conn, const char *query)
  {
!     PGresult   *volatile last_res = NULL;

!     /* In what follows, do not leak any PGresults on an error. */
!     PG_TRY();
      {
!         for (;;)
          {
!             PGresult   *res;

!             while (PQisBusy(conn))
!             {
!                 int            wc;

!                 /* Sleep until there's something to do */
!                 wc = WaitLatchOrSocket(MyLatch,
!                                        WL_LATCH_SET | WL_SOCKET_READABLE,
!                                        PQsocket(conn),
!                                        -1L, PG_WAIT_EXTENSION);
!                 ResetLatch(MyLatch);

!                 CHECK_FOR_INTERRUPTS();
!
!                 /* Data available in socket? */
!                 if (wc & WL_SOCKET_READABLE)
!                 {
!                     if (!PQconsumeInput(conn))
!                         pgfdw_report_error(ERROR, NULL, conn, false, query);
!                 }
              }

!             res = PQgetResult(conn);
!             if (res == NULL)
!                 break;            /* query is complete */

+             PQclear(last_res);
+             last_res = res;
+         }
+     }
+     PG_CATCH();
+     {
          PQclear(last_res);
!         PG_RE_THROW();
      }
+     PG_END_TRY();

      return last_res;
  }
*************** pgfdw_exec_cleanup_query(PGconn *conn, c
*** 1006,1011 ****
--- 1016,1022 ----
          pgfdw_report_error(WARNING, result, conn, true, query);
          return ignore_errors;
      }
+     PQclear(result);

      return true;
  }
*************** pgfdw_exec_cleanup_query(PGconn *conn, c
*** 1028,1083 ****
  static bool
  pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
  {
!     PGresult   *last_res = NULL;

!     for (;;)
      {
!         PGresult   *res;
!
!         while (PQisBusy(conn))
          {
!             int            wc;
!             TimestampTz now = GetCurrentTimestamp();
!             long        secs;
!             int            microsecs;
!             long        cur_timeout;

!             /* If timeout has expired, give up, else get sleep time. */
!             if (now >= endtime)
!                 return true;
!             TimestampDifference(now, endtime, &secs, µsecs);

!             /* To protect against clock skew, limit sleep to one minute. */
!             cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);

!             /* Sleep until there's something to do */
!             wc = WaitLatchOrSocket(MyLatch,
                                WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
!                                    PQsocket(conn),
!                                    cur_timeout, PG_WAIT_EXTENSION);
!             ResetLatch(MyLatch);

!             CHECK_FOR_INTERRUPTS();

!             /* Data available in socket */
!             if (wc & WL_SOCKET_READABLE)
!             {
!                 if (!PQconsumeInput(conn))
                  {
!                     *result = NULL;
!                     return false;
                  }
              }
-         }

!         res = PQgetResult(conn);
!         if (res == NULL)
!             break;                /* query is complete */

          PQclear(last_res);
!         last_res = res;
      }

!     *result = last_res;
!     return false;
  }
--- 1039,1113 ----
  static bool
  pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
  {
!     volatile bool timed_out = false;
!     PGresult   *volatile last_res = NULL;

!     /* In what follows, do not leak any PGresults on an error. */
!     PG_TRY();
      {
!         for (;;)
          {
!             PGresult   *res;

!             while (PQisBusy(conn))
!             {
!                 int            wc;
!                 TimestampTz now = GetCurrentTimestamp();
!                 long        secs;
!                 int            microsecs;
!                 long        cur_timeout;

!                 /* If timeout has expired, give up, else get sleep time. */
!                 if (now >= endtime)
!                 {
!                     timed_out = true;
!                     goto exit;
!                 }
!                 TimestampDifference(now, endtime, &secs, µsecs);

!                 /* To protect against clock skew, limit sleep to one minute. */
!                 cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
!
!                 /* Sleep until there's something to do */
!                 wc = WaitLatchOrSocket(MyLatch,
                                WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
!                                        PQsocket(conn),
!                                        cur_timeout, PG_WAIT_EXTENSION);
!                 ResetLatch(MyLatch);

!                 CHECK_FOR_INTERRUPTS();

!                 /* Data available in socket? */
!                 if (wc & WL_SOCKET_READABLE)
                  {
!                     if (!PQconsumeInput(conn))
!                     {
!                         /* connection trouble; treat the same as a timeout */
!                         timed_out = true;
!                         goto exit;
!                     }
                  }
              }

!             res = PQgetResult(conn);
!             if (res == NULL)
!                 break;            /* query is complete */

+             PQclear(last_res);
+             last_res = res;
+         }
+ exit:    ;
+     }
+     PG_CATCH();
+     {
          PQclear(last_res);
!         PG_RE_THROW();
      }
+     PG_END_TRY();

!     if (timed_out)
!         PQclear(last_res);
!     else
!         *result = last_res;
!     return timed_out;
  }
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7509b4f..f6fa0e4 100644
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
*************** libpqrcv_PQexec(PGconn *streamConn, cons
*** 591,603 ****
                  ResetLatch(MyLatch);
                  CHECK_FOR_INTERRUPTS();
              }
              if (PQconsumeInput(streamConn) == 0)
!                 return NULL;    /* trouble */
          }

          /*
!          * Emulate the PQexec()'s behavior of returning the last result when
!          * there are many. We are fine with returning just last error message.
           */
          result = PQgetResult(streamConn);
          if (result == NULL)
--- 591,609 ----
                  ResetLatch(MyLatch);
                  CHECK_FOR_INTERRUPTS();
              }
+
+             /* Consume whatever data is available from the socket */
              if (PQconsumeInput(streamConn) == 0)
!             {
!                 /* trouble; drop whatever we had and return NULL */
!                 PQclear(lastResult);
!                 return NULL;
!             }
          }

          /*
!          * Emulate PQexec()'s behavior of returning the last result when there
!          * are many.  We are fine with returning just last error message.
           */
          result = PQgetResult(streamConn);
          if (result == NULL)

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

pgsql-hackers by date:

Previous
From: Mark Kirkwood
Date:
Subject: Re: [HACKERS] logical replication: \dRp+ and "for all tables"
Next
From: Masahiko Sawada
Date:
Subject: Re: [HACKERS] Get stuck when dropping a subscription duringsynchronizing table