From a25bd7013ba491ebc894f5c5dfba3a751ab501c0 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Sat, 6 Apr 2024 19:19:28 +0200 Subject: [PATCH v6] Faster internal_putbytes --- src/backend/libpq/pqcomm.c | 69 ++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 6497100a1a4..df40ef0a5e3 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -119,9 +119,9 @@ static List *sock_paths = NIL; #define PQ_RECV_BUFFER_SIZE 8192 static char *PqSendBuffer; -static int PqSendBufferSize; /* Size send buffer */ -static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ -static int PqSendStart; /* Next index to send a byte in PqSendBuffer */ +static size_t PqSendBufferSize; /* Size send buffer */ +static size_t PqSendPointer; /* Next index to store a byte in PqSendBuffer */ +static size_t PqSendStart; /* Next index to send a byte in PqSendBuffer */ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ @@ -133,6 +133,7 @@ static int PqRecvLength; /* End of data available in PqRecvBuffer */ static bool PqCommBusy; /* busy sending data to the client */ static bool PqCommReadingMsg; /* in the middle of reading a message */ +#define internal_flush() internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer) /* Internal functions */ static void socket_comm_reset(void); @@ -143,8 +144,9 @@ static int socket_flush_if_writable(void); static bool socket_is_send_pending(void); static int socket_putmessage(char msgtype, const char *s, size_t len); static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); -static int internal_putbytes(const char *s, size_t len); -static int internal_flush(void); +static inline int internal_putbytes(const char *s, size_t len); +static pg_noinline int internal_flush_buffer(const char *s, size_t *start, + size_t *end); static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath); static int Setup_AF_UNIX(const char *sock_path); @@ -1268,11 +1270,9 @@ pq_getmessage(StringInfo s, int maxlen) } -static int +static inline int internal_putbytes(const char *s, size_t len) { - size_t amount; - while (len > 0) { /* If buffer is full, then flush it out */ @@ -1282,14 +1282,33 @@ internal_putbytes(const char *s, size_t len) if (internal_flush()) return EOF; } - amount = PqSendBufferSize - PqSendPointer; - if (amount > len) - amount = len; - memcpy(PqSendBuffer + PqSendPointer, s, amount); - PqSendPointer += amount; - s += amount; - len -= amount; + + /* + * If the buffer is empty and data length is larger than the buffer + * size, send it without buffering. Otherwise, put as much data as + * possible into the buffer. + */ + if (len >= PqSendBufferSize && PqSendStart == PqSendPointer) + { + size_t start = 0; + + socket_set_nonblocking(false); + if (internal_flush_buffer(s, &start, &len)) + return EOF; + } + else + { + size_t amount = PqSendBufferSize - PqSendPointer; + + if (amount > len) + amount = len; + memcpy(PqSendBuffer + PqSendPointer, s, amount); + PqSendPointer += amount; + s += amount; + len -= amount; + } } + return 0; } @@ -1315,25 +1334,25 @@ socket_flush(void) } /* -------------------------------- - * internal_flush - flush pending output + * internal_flush_buffer - flush the given buffer content * * Returns 0 if OK (meaning everything was sent, or operation would block * and the socket is in non-blocking mode), or EOF if trouble. * -------------------------------- */ -static int -internal_flush(void) +static pg_noinline int +internal_flush_buffer(const char *s, size_t *start, size_t *end) { static int last_reported_send_errno = 0; - char *bufptr = PqSendBuffer + PqSendStart; - char *bufend = PqSendBuffer + PqSendPointer; + const char *bufptr = s + *start; + const char *bufend = s + *end; while (bufptr < bufend) { int r; - r = secure_write(MyProcPort, bufptr, bufend - bufptr); + r = secure_write(MyProcPort, (char *) bufptr, bufend - bufptr); if (r <= 0) { @@ -1373,7 +1392,7 @@ internal_flush(void) * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate * the connection. */ - PqSendStart = PqSendPointer = 0; + *start = *end = 0; ClientConnectionLost = 1; InterruptPending = 1; return EOF; @@ -1381,10 +1400,10 @@ internal_flush(void) last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; - PqSendStart += r; + *start += r; } - PqSendStart = PqSendPointer = 0; + *start = *end = 0; return 0; } @@ -1487,7 +1506,7 @@ static void socket_putmessage_noblock(char msgtype, const char *s, size_t len) { int res PG_USED_FOR_ASSERTS_ONLY; - int required; + size_t required; /* * Ensure we have enough space in the output buffer for the message header base-commit: f956ecd0353b2960f8322b2211142113fe2b6f67 -- 2.34.1