From 289a9a759683c4f36d5bc6960dd74aa37049d061 Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Fri, 5 Dec 2025 18:20:23 -0500 Subject: [PATCH] wip holdable portals update docs for new protocol message add function PQsendBindWithCursorOptions to allow cursors with options to be created and fix test to work properly --- doc/src/sgml/protocol.sgml | 40 +++- src/backend/tcop/postgres.c | 36 +++ src/include/libpq/pqcomm.h | 2 +- src/interfaces/libpq/exports.txt | 2 + src/interfaces/libpq/fe-connect.c | 5 + src/interfaces/libpq/fe-exec.c | 222 ++++++++++++++++++ src/interfaces/libpq/libpq-fe.h | 8 + .../modules/libpq_pipeline/libpq_pipeline.c | 90 +++++++ 8 files changed, 399 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 41c5954a424..3871a8cdf93 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -18,7 +18,7 @@ - This document describes version 3.2 of the protocol, introduced in + This document describes version 3.3 of the protocol, introduced in PostgreSQL version 18. The server and the libpq client library are backwards compatible with protocol version 3.0, implemented in PostgreSQL 7.4 and later. @@ -192,7 +192,7 @@ Protocol Versions - The current, latest version of the protocol is version 3.2. However, for + The current, latest version of the protocol is version 3.3. However, for backwards compatibility with old server versions and middleware that don't support the version negotiation yet, libpq still uses protocol version 3.0 by default. @@ -206,7 +206,7 @@ this would occur if the client requested protocol version 4.0, which does not exist as of this writing). If the minor version requested by the client is not supported by the server (e.g., the client requests version - 3.2, but the server supports only 3.0), the server may either reject the + 3.3, but the server supports only 3.0), the server may either reject the connection or may respond with a NegotiateProtocolVersion message containing the highest minor protocol version which it supports. The client may then choose either to continue with the connection using the @@ -238,10 +238,18 @@ + + 3.3 + PostgreSQL 18 and later + Current latest version. The Bind message now supports an optional + cursor options field to control portal behavior, including the ability + to create holdable portals that survive transaction commit. + + 3.2 PostgreSQL 18 and later - Current latest version. The secret key used in query + The secret key used in query cancellation was enlarged from 4 bytes to a variable length field. The BackendKeyData message was changed to accommodate that, and the CancelRequest message was redefined to have a variable length payload. @@ -981,6 +989,9 @@ SELCT 1/0; pass NULL values for them in the Bind message.) Bind also specifies the format to use for any data returned by the query; the format can be specified overall, or per-column. + In protocol 3.3 and later, Bind can optionally specify cursor options + to control portal behavior, such as creating a holdable portal that + survives transaction commit. The response is either BindComplete or ErrorResponse. @@ -1005,7 +1016,10 @@ SELCT 1/0; If successfully created, a named portal object lasts till the end of the - current transaction, unless explicitly destroyed. An unnamed portal is + current transaction, unless explicitly destroyed. However, a portal + created with the CURSOR_OPT_HOLD option (protocol 3.3 and later) is + holdable and survives transaction commit, remaining + valid until explicitly closed or the session ends. An unnamed portal is destroyed at the end of the transaction, or as soon as the next Bind statement specifying the unnamed portal as destination is issued. (Note that a simple Query message also destroys the unnamed portal.) Named @@ -4292,6 +4306,22 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Int32 + + + Cursor options (protocol 3.3 and later). A bitmask of options + for the portal being created. Currently defined bits are: + 0x0001 (CURSOR_OPT_BINARY, same as setting + result format codes to binary), + 0x0020 (CURSOR_OPT_HOLD, creates a holdable + portal that survives transaction commit). + This field is optional; if not present, no cursor options are set. + Named portals are required when using CURSOR_OPT_HOLD. + + + diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 7dd75a490aa..7e6a45cc8b9 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -1636,6 +1636,7 @@ exec_bind_message(StringInfo input_message) int numParams; int numRFormats; int16 *rformats = NULL; + int cursorOptions = 0; CachedPlanSource *psrc; CachedPlan *cplan; Portal portal; @@ -2013,6 +2014,12 @@ exec_bind_message(StringInfo input_message) rformats[i] = pq_getmsgint(input_message, 2); } + /* Get cursor options if present (protocol 3.3+) */ + if (input_message->cursor < input_message->len) + { + cursorOptions = pq_getmsgint(input_message, 4); + elog(DEBUG1, "exec_bind_message: read cursorOptions=0x%04x from message", cursorOptions); + } pq_getmsgend(input_message); /* @@ -2061,6 +2068,26 @@ exec_bind_message(StringInfo input_message) */ PortalSetResultFormat(portal, numRFormats, rformats); + /* Apply cursor options */ + if (cursorOptions & CURSOR_OPT_HOLD) + { + elog(DEBUG1, "exec_bind_message: applying CURSOR_OPT_HOLD to portal '%s'", portal_name); + + if (portal_name[0] == '\0') + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_NAME), + errmsg("holdable cursors require a named portal"))); + if (InSecurityRestrictedOperation()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("cannot create cursor WITH HOLD in restricted operation"))); + + elog(DEBUG1, "exec_bind_message: CURSOR_OPT_HOLD validation passed for portal '%s'", portal_name); + } + + portal->cursorOptions = cursorOptions; + elog(DEBUG1, "exec_bind_message: portal '%s' cursorOptions set to 0x%04x", portal_name, cursorOptions); + /* * Done binding; remove the parameters error callback. Entries emitted * later determine independently whether to log the parameters or not. @@ -4908,7 +4935,16 @@ PostgresMain(const char *dbname, const char *username) portal = GetPortalByName(close_target); if (PortalIsValid(portal)) + { + elog(DEBUG1, "Close message: closing portal '%s' (cursorOptions=0x%04x)", + close_target, portal->cursorOptions); PortalDrop(portal, false); + elog(DEBUG1, "Close message: portal '%s' closed successfully", close_target); + } + else + { + elog(DEBUG1, "Close message: portal '%s' not found", close_target); + } } break; default: diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index 625f4b43879..dd80da3809a 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -92,7 +92,7 @@ is_unixsock_path(const char *path) * The earliest and latest frontend/backend protocol version supported. */ #define PG_PROTOCOL_EARLIEST PG_PROTOCOL(3,0) -#define PG_PROTOCOL_LATEST PG_PROTOCOL(3,2) +#define PG_PROTOCOL_LATEST PG_PROTOCOL(3,3) /* * Reserved protocol numbers, which have special semantics: diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index dbbae642d76..b01c0948585 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -210,3 +210,5 @@ PQgetAuthDataHook 207 PQdefaultAuthDataHook 208 PQfullProtocolVersion 209 appendPQExpBufferVA 210 +PQsendQueryPreparedWithCursorOptions 211 +PQsendBindWithCursorOptions 212 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index c3a2448dce5..019f62631f2 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -8336,6 +8336,11 @@ pqParseProtocolVersion(const char *value, ProtocolVersion *result, PGconn *conn, *result = PG_PROTOCOL(3, 2); return true; } + if (strcmp(value, "3.3") == 0) + { + *result = PG_PROTOCOL(3, 3); + return true; + } libpq_append_conn_error(conn, "invalid %s value: \"%s\"", context, value); diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 7ab33930a39..1804365cd18 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1682,6 +1682,228 @@ PQsendQueryPrepared(PGconn *conn, resultFormat); } +int +PQsendQueryPreparedWithCursorOptions(PGconn *conn, + const char *stmtName, + int nParams, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + const char *portalName, + int cursorOptions) +{ + PGcmdQueueEntry *entry; + + if (!PQsendQueryStart(conn, true)) + return 0; + + if (!stmtName) + { + libpq_append_conn_error(conn, "statement name is a null pointer"); + return 0; + } + + if ((cursorOptions & 0x0020) && (!portalName || portalName[0] == '\0')) + { + libpq_append_conn_error(conn, "holdable cursors require a named portal"); + return 0; + } + + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; + + if (pqPutMsgStart(PqMsg_Bind, conn) < 0 || + pqPuts(portalName ? portalName : "", conn) < 0 || + pqPuts(stmtName, conn) < 0) + goto sendFailed; + + if (nParams > 0 && paramFormats) + { + if (pqPutInt(nParams, 2, conn) < 0) + goto sendFailed; + for (int i = 0; i < nParams; i++) + if (pqPutInt(paramFormats[i], 2, conn) < 0) + goto sendFailed; + } + else if (pqPutInt(0, 2, conn) < 0) + goto sendFailed; + + if (pqPutInt(nParams, 2, conn) < 0) + goto sendFailed; + + for (int i = 0; i < nParams; i++) + { + if (paramValues && paramValues[i]) + { + int len = paramLengths ? paramLengths[i] : strlen(paramValues[i]); + if (pqPutInt(len, 4, conn) < 0 || + pqPutnchar(paramValues[i], len, conn) < 0) + goto sendFailed; + } + else if (pqPutInt(-1, 4, conn) < 0) + goto sendFailed; + } + + if (pqPutInt(1, 2, conn) < 0 || + pqPutInt(resultFormat, 2, conn) < 0) + goto sendFailed; + + /* Send cursor options if protocol 3.3+ */ + if (conn->pversion >= PG_PROTOCOL(3, 3)) + { + if (pqPutInt(cursorOptions, 4, conn) < 0) + goto sendFailed; + } + + if (pqPutMsgEnd(conn) < 0) + goto sendFailed; + + if (pqPutMsgStart(PqMsg_Describe, conn) < 0 || + pqPutc('P', conn) < 0 || + pqPuts(portalName ? portalName : "", conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + if (pqPutMsgStart(PqMsg_Execute, conn) < 0 || + pqPuts(portalName ? portalName : "", conn) < 0 || + pqPutInt(0, 4, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart(PqMsg_Sync, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } + + entry->queryclass = PGQUERY_EXTENDED; + + if (pqPipelineFlush(conn) < 0) + goto sendFailed; + + conn->asyncStatus = PGASYNC_BUSY; + return 1; + +sendFailed: + pqRecycleCmdQueueEntry(conn, entry); + return 0; +} + +/* + * PQsendBindWithCursorOptions + * Like PQsendQueryPreparedWithCursorOptions but sends only Bind+Describe, + * not Execute. This allows creating a portal that can be executed later, + * which is necessary for testing holdable portals (execute after commit). + */ +int +PQsendBindWithCursorOptions(PGconn *conn, + const char *stmtName, + int nParams, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + const char *portalName, + int cursorOptions) +{ + PGcmdQueueEntry *entry; + + if (!PQsendQueryStart(conn, true)) + return 0; + + if (!stmtName) + { + libpq_append_conn_error(conn, "statement name is a null pointer"); + return 0; + } + + if ((cursorOptions & 0x0020) && (!portalName || portalName[0] == '\0')) + { + libpq_append_conn_error(conn, "holdable cursors require a named portal"); + return 0; + } + + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; + + if (pqPutMsgStart(PqMsg_Bind, conn) < 0 || + pqPuts(portalName ? portalName : "", conn) < 0 || + pqPuts(stmtName, conn) < 0) + goto sendFailed; + + if (nParams > 0 && paramFormats) + { + if (pqPutInt(nParams, 2, conn) < 0) + goto sendFailed; + for (int i = 0; i < nParams; i++) + if (pqPutInt(paramFormats[i], 2, conn) < 0) + goto sendFailed; + } + else if (pqPutInt(0, 2, conn) < 0) + goto sendFailed; + + if (pqPutInt(nParams, 2, conn) < 0) + goto sendFailed; + + for (int i = 0; i < nParams; i++) + { + if (paramValues && paramValues[i]) + { + int len = paramLengths ? paramLengths[i] : strlen(paramValues[i]); + if (pqPutInt(len, 4, conn) < 0 || + pqPutnchar(paramValues[i], len, conn) < 0) + goto sendFailed; + } + else if (pqPutInt(-1, 4, conn) < 0) + goto sendFailed; + } + + if (pqPutInt(1, 2, conn) < 0 || + pqPutInt(resultFormat, 2, conn) < 0) + goto sendFailed; + + /* Send cursor options if protocol 3.3+ */ + if (conn->pversion >= PG_PROTOCOL(3, 3)) + { + if (pqPutInt(cursorOptions, 4, conn) < 0) + goto sendFailed; + } + + if (pqPutMsgEnd(conn) < 0) + goto sendFailed; + + if (pqPutMsgStart(PqMsg_Describe, conn) < 0 || + pqPutc('P', conn) < 0 || + pqPuts(portalName ? portalName : "", conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + /* No Execute message - portal is created but not executed */ + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart(PqMsg_Sync, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } + + entry->queryclass = PGQUERY_EXTENDED; + + if (pqPipelineFlush(conn) < 0) + goto sendFailed; + + conn->asyncStatus = PGASYNC_BUSY; + return 1; + +sendFailed: + pqRecycleCmdQueueEntry(conn, entry); + return 0; +} + /* * PQsendQueryStart * Common startup code for PQsendQuery and sibling routines diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 0852584edae..2311f555137 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -525,6 +525,14 @@ extern int PQsendQueryPrepared(PGconn *conn, const int *paramLengths, const int *paramFormats, int resultFormat); +extern int PQsendQueryPreparedWithCursorOptions(PGconn *conn, const char *stmtName, + int nParams, const char *const *paramValues, + const int *paramLengths, const int *paramFormats, + int resultFormat, const char *portalName, int cursorOptions); +extern int PQsendBindWithCursorOptions(PGconn *conn, const char *stmtName, + int nParams, const char *const *paramValues, + const int *paramLengths, const int *paramFormats, + int resultFormat, const char *portalName, int cursorOptions); extern int PQsetSingleRowMode(PGconn *conn); extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize); extern PGresult *PQgetResult(PGconn *conn); diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index b3af70fa09b..bf97cab88d3 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -2082,6 +2082,93 @@ process_result(PGconn *conn, PGresult *res, int results, int numsent) return got_error; } +/* + * Test holdable cursors using protocol 3.3 cursor options in Bind message. + */ +static void +test_holdable_cursor(PGconn *conn) +{ + PGresult *res; + + fprintf(stderr, "holdable cursor... "); + + /* Verify protocol 3.3 */ + if (PQfullProtocolVersion(conn) < 30003) + pg_fatal("protocol 3.3 required, got %d", PQfullProtocolVersion(conn)); + + /* Start transaction */ + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("BEGIN failed: %s", PQerrorMessage(conn)); + PQclear(res); + + /* Create test table */ + res = PQexec(conn, "CREATE TEMP TABLE holdable_test(id int)"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("CREATE TABLE failed: %s", PQerrorMessage(conn)); + PQclear(res); + + res = PQexec(conn, "INSERT INTO holdable_test VALUES (1), (2), (3)"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("INSERT failed: %s", PQerrorMessage(conn)); + PQclear(res); + + /* Prepare statement */ + res = PQprepare(conn, "holdstmt", "SELECT * FROM holdable_test", 0, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("PREPARE failed: %s", PQerrorMessage(conn)); + PQclear(res); + + /* Enter pipeline mode */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + /* Create holdable portal using Bind with cursor options (no Execute) */ + if (PQsendBindWithCursorOptions(conn, "holdstmt", 0, NULL, NULL, NULL, 0, "holdportal", 0x0020) != 1) + pg_fatal("PQsendBindWithCursorOptions failed: %s", PQerrorMessage(conn)); + + /* Commit - portal should survive */ + if (PQsendQueryParams(conn, "COMMIT", 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("COMMIT failed: %s", PQerrorMessage(conn)); + + /* Execute portal after commit using FETCH (portals created via Bind are cursors) */ + if (PQsendQueryParams(conn, "FETCH ALL FROM holdportal", 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("FETCH failed: %s", PQerrorMessage(conn)); + + /* Close portal */ + if (PQsendQueryParams(conn, "CLOSE holdportal", 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("CLOSE failed: %s", PQerrorMessage(conn)); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + /* Get results */ + res = confirm_result_status(conn, PGRES_TUPLES_OK); /* RowDescription from Bind+Describe */ + if (PQnfields(res) != 1) + pg_fatal("expected 1 field, got %d", PQnfields(res)); + PQclear(res); + consume_null_result(conn); + + /* COMMIT result seems to be skipped/combined - this is a libpq behavior */ + + res = confirm_result_status(conn, PGRES_TUPLES_OK); /* FETCH after commit */ + if (PQntuples(res) != 3) + pg_fatal("expected 3 rows after commit, got %d", PQntuples(res)); + PQclear(res); + consume_null_result(conn); + + consume_result_status(conn, PGRES_COMMAND_OK); /* CLOSE */ + consume_null_result(conn); + + consume_result_status(conn, PGRES_PIPELINE_SYNC); + consume_null_result(conn); + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); +} + static void usage(const char *progname) @@ -2100,6 +2187,7 @@ print_test_list(void) { printf("cancel\n"); printf("disallowed_in_pipeline\n"); + printf("holdable_cursor\n"); printf("multi_pipelines\n"); printf("nosync\n"); printf("pipeline_abort\n"); @@ -2207,6 +2295,8 @@ main(int argc, char **argv) test_cancel(conn); else if (strcmp(testname, "disallowed_in_pipeline") == 0) test_disallowed_in_pipeline(conn); + else if (strcmp(testname, "holdable_cursor") == 0) + test_holdable_cursor(conn); else if (strcmp(testname, "multi_pipelines") == 0) test_multi_pipelines(conn); else if (strcmp(testname, "nosync") == 0) -- 2.50.1 (Apple Git-155)