From caba80f3d29eb199e966575a6d05f5a2941355fb Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Mon, 20 Nov 2023 11:20:52 +0300 Subject: [PATCH v2] Flush large data immediately in pqcomm If the data is larger than send buffer size in pqcomm, we're sure that the send buffer will be flushed at least once to fit the data depending on how large the data is. Instead of repeatedly calling memcpy and then flushing data larger than the available space in the send buffer, this patch changes internal_putbytes() logic to flush large data immediately if the buffer is empty without unnecessarily copying it into the pqcomm send buffer. --- src/backend/libpq/pqcomm.c | 59 +++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 6497100a1a..7c54745f69 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -145,6 +145,7 @@ static int socket_putmessage(char msgtype, const char *s, size_t len); static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); +static int internal_flush_buffer(const char *s, int *start, int *end); static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath); static int Setup_AF_UNIX(const char *sock_path); @@ -1282,14 +1283,32 @@ internal_putbytes(const char *s, size_t len) if (internal_flush()) return EOF; } - amount = PqSendBufferSize - PqSendPointer; - if (amount > len) - amount = len; - memcpy(PqSendBuffer + PqSendPointer, s, amount); - PqSendPointer += amount; - s += amount; - len -= amount; + + /* + * If the buffer is empty and data length is larger than the buffer + * size, send it without buffering. Otherwise, put as much data as + * possible into the buffer. + */ + if (!pq_is_send_pending() && len >= PqSendBufferSize) + { + int start = 0; + + socket_set_nonblocking(false); + if (internal_flush_buffer(s, &start, (int *)&len)) + return EOF; + } + else + { + amount = PqSendBufferSize - PqSendPointer; + if (amount > len) + amount = len; + memcpy(PqSendBuffer + PqSendPointer, s, amount); + PqSendPointer += amount; + s += amount; + len -= amount; + } } + return 0; } @@ -1323,11 +1342,25 @@ socket_flush(void) */ static int internal_flush(void) +{ + /* flush the pending output from send buffer. */ + return internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer); +} + +/* -------------------------------- + * internal_flush_buffer - flush the given buffer content + * + * Returns 0 if OK (meaning everything was sent, or operation would block + * and the socket is in non-blocking mode), or EOF if trouble. + * -------------------------------- + */ +static int +internal_flush_buffer(const char *s, int *start, int *end) { static int last_reported_send_errno = 0; - char *bufptr = PqSendBuffer + PqSendStart; - char *bufend = PqSendBuffer + PqSendPointer; + char *bufptr = (char*) s + *start; + char *bufend = (char*) s + *end; while (bufptr < bufend) { @@ -1347,7 +1380,7 @@ internal_flush(void) if (errno == EAGAIN || errno == EWOULDBLOCK) { - return 0; + return 0; } /* @@ -1373,7 +1406,7 @@ internal_flush(void) * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate * the connection. */ - PqSendStart = PqSendPointer = 0; + *start = *end = 0; ClientConnectionLost = 1; InterruptPending = 1; return EOF; @@ -1381,10 +1414,10 @@ internal_flush(void) last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; - PqSendStart += r; + *start += r; } - PqSendStart = PqSendPointer = 0; + *start = *end = 0; return 0; } -- 2.34.1