Re: Pipeline mode and PQpipelineSync() - Mailing list pgsql-hackers

From Alvaro Herrera
Subject Re: Pipeline mode and PQpipelineSync()
Date
Msg-id 202107071730.vkyej2goquhp@alvherre.pgsql
Whole thread Raw
In response to Re: Pipeline mode and PQpipelineSync()  (Boris Kolpackov <boris@codesynthesis.com>)
Responses Re: Pipeline mode and PQpipelineSync()  (Boris Kolpackov <boris@codesynthesis.com>)
List pgsql-hackers
On 2021-Jul-07, Boris Kolpackov wrote:

>     // Try to minimize the chance of blocking the server by first processing
>     // the result and then sending more queries.
>     //
>     if (FD_ISSET (sock, &rds))
>     {
>       if (PQconsumeInput (conn) == 0)
>         assert (false);
> 
>       while (PQisBusy (conn) == 0)
>       {
>         //fprintf (stderr, "PQgetResult %zu\n", rn);
> 
>         PGresult* res = PQgetResult (conn);
>         assert (res != NULL);
>         ExecStatusType stat = PQresultStatus (res);

Hmm ... aren't you trying to read more results than you sent queries?  I
think there should be a break out of that block when that happens (which
means the read of the PGRES_PIPELINE_SYNC needs to be out of there too).
With this patch, the program seems to work well for me.

***************
*** 94,112 ****
        while (PQisBusy (conn) == 0)
        {
          //fprintf (stderr, "PQgetResult %zu\n", rn);
  
          PGresult* res = PQgetResult (conn);
          assert (res != NULL);
          ExecStatusType stat = PQresultStatus (res);
  
-         if (stat == PGRES_PIPELINE_SYNC)
-         {
-           assert (wdone && rn == n);
-           PQclear (res);
-           rdone = true;
-           break;
-         }
- 
          if (stat == PGRES_FATAL_ERROR)
          {
            const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);
--- 94,110 ----
        while (PQisBusy (conn) == 0)
        {
          //fprintf (stderr, "PQgetResult %zu\n", rn);
+         if (rn >= wn)
+         {
+           if (wdone)
+             rdone = true;
+           break;
+         }
  
          PGresult* res = PQgetResult (conn);
          assert (res != NULL);
          ExecStatusType stat = PQresultStatus (res);
  
          if (stat == PGRES_FATAL_ERROR)
          {
            const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);
***************
*** 190,195 ****
--- 188,201 ----
          break; // Blocked or done.
        }
      }
+ 
+     if (rdone)
+     {
+       PGresult *res = PQgetResult(conn);
+       assert(PQresultStatus(res) == PGRES_PIPELINE_SYNC);
+       PQclear(res);
+       break;
+     }
    }
  
    if (PQexitPipelineMode (conn) == 0 ||
***************
*** 246,248 ****
--- 252,269 ----
      PQclear (res);
    }
  }
+ 
+ int main(int argc, char **argv)
+ {
+   PGconn *conn = PQconnectdb("");
+   if (PQstatus(conn) != CONNECTION_OK)
+   {
+     fprintf(stderr, "connection failed: %s\n",
+             PQerrorMessage(conn));
+     return 1;
+   }
+ 
+   test(conn);
+ }
+ 
+ 


-- 
Álvaro Herrera              Valdivia, Chile  —  https://www.EnterpriseDB.com/



pgsql-hackers by date:

Previous
From: Gilles Darold
Date:
Subject: Re: Case expression pushdown
Next
From: Ranier Vilela
Date:
Subject: re: Why ALTER SUBSCRIPTION ... SET (slot_name='none') requires subscription disabled?