From 7c7f53340bec5147a7b0e896c2ee649c73c75206 Mon Sep 17 00:00:00 2001 From: "Chao Li (Evan)" Date: Tue, 2 Dec 2025 16:03:19 +0800 Subject: [PATCH v4 11/13] cleanup: rename local conn variables to avoid shadowing the global This commit renames all local variables named 'conn' to 'myconn' to avoid shadowing the global connection variable. This ensures the local and global identifiers remain clearly distinct within each scope. A few additional shadowing fixes in the same code areas are included as well. These are unrelated to the conn renaming but occur in the same files, so they are bundled here to keep the commit self-contained. Author: Chao Li Discussion: https://postgr.es/m/CAEoWx2kQ2x5gMaj8tHLJ3=jfC+p5YXHkJyHrDTiQw2nn2FJTmQ@mail.gmail.com --- src/bin/pg_basebackup/pg_basebackup.c | 32 ++++---- src/bin/pg_basebackup/pg_recvlogical.c | 24 +++--- src/bin/pg_basebackup/receivelog.c | 108 ++++++++++++------------- src/bin/pg_basebackup/streamutil.c | 58 ++++++------- 4 files changed, 111 insertions(+), 111 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 0a3ca4315de..5a57c64dcd1 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1013,16 +1013,16 @@ backup_parse_compress_options(char *option, char **algorithm, char **detail, * chunk. */ static void -ReceiveCopyData(PGconn *conn, WriteDataCallback callback, +ReceiveCopyData(PGconn *myconn, WriteDataCallback callback, void *callback_data) { PGresult *res; /* Get the COPY data stream. */ - res = PQgetResult(conn); + res = PQgetResult(myconn); if (PQresultStatus(res) != PGRES_COPY_OUT) pg_fatal("could not get COPY data stream: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); PQclear(res); /* Loop over chunks until done. */ @@ -1031,7 +1031,7 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback, int r; char *copybuf; - r = PQgetCopyData(conn, ©buf, 0); + r = PQgetCopyData(myconn, ©buf, 0); if (r == -1) { /* End of chunk. */ @@ -1039,7 +1039,7 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback, } else if (r == -2) pg_fatal("could not read COPY data: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); if (bgchild_exited) pg_fatal("background process terminated unexpectedly"); @@ -1283,7 +1283,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * manifest if present - as a single COPY stream. */ static void -ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) +ReceiveArchiveStream(PGconn *myconn, pg_compress_specification *compress) { ArchiveStreamState state; @@ -1293,7 +1293,7 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) state.compress = compress; /* All the real work happens in ReceiveArchiveStreamChunk. */ - ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state); + ReceiveCopyData(myconn, ReceiveArchiveStreamChunk, &state); /* If we wrote the backup manifest to a file, close the file. */ if (state.manifest_file !=NULL) @@ -1598,7 +1598,7 @@ ReportCopyDataParseError(size_t r, char *copybuf) * receive the backup manifest and inject it into that tarfile. */ static void -ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, +ReceiveTarFile(PGconn *myconn, char *archive_name, char *spclocation, bool tablespacenum, pg_compress_specification *compress) { WriteTarState state; @@ -1609,16 +1609,16 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, /* Pass all COPY data through to the backup streamer. */ memset(&state, 0, sizeof(state)); is_recovery_guc_supported = - PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC; + PQserverVersion(myconn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC; expect_unterminated_tarfile = - PQserverVersion(conn) < MINIMUM_VERSION_FOR_TERMINATED_TARFILE; + PQserverVersion(myconn) < MINIMUM_VERSION_FOR_TERMINATED_TARFILE; state.streamer = CreateBackupStreamer(archive_name, spclocation, &manifest_inject_streamer, is_recovery_guc_supported, expect_unterminated_tarfile, compress); state.tablespacenum = tablespacenum; - ReceiveCopyData(conn, ReceiveTarCopyChunk, &state); + ReceiveCopyData(myconn, ReceiveTarCopyChunk, &state); progress_update_filename(NULL); /* @@ -1633,7 +1633,7 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, /* Slurp the entire backup manifest into a buffer. */ initPQExpBuffer(&buf); - ReceiveBackupManifestInMemory(conn, &buf); + ReceiveBackupManifestInMemory(myconn, &buf); if (PQExpBufferDataBroken(buf)) pg_fatal("out of memory"); @@ -1697,7 +1697,7 @@ get_tablespace_mapping(const char *dir) * Receive the backup manifest file and write it out to a file. */ static void -ReceiveBackupManifest(PGconn *conn) +ReceiveBackupManifest(PGconn *myconn) { WriteManifestState state; @@ -1707,7 +1707,7 @@ ReceiveBackupManifest(PGconn *conn) if (state.file == NULL) pg_fatal("could not create file \"%s\": %m", state.filename); - ReceiveCopyData(conn, ReceiveBackupManifestChunk, &state); + ReceiveCopyData(myconn, ReceiveBackupManifestChunk, &state); fclose(state.file); } @@ -1734,9 +1734,9 @@ ReceiveBackupManifestChunk(size_t r, char *copybuf, void *callback_data) * Receive the backup manifest file and write it out to a file. */ static void -ReceiveBackupManifestInMemory(PGconn *conn, PQExpBuffer buf) +ReceiveBackupManifestInMemory(PGconn *myconn, PQExpBuffer buf) { - ReceiveCopyData(conn, ReceiveBackupManifestInMemoryChunk, buf); + ReceiveCopyData(myconn, ReceiveBackupManifestInMemoryChunk, buf); } /* diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 14ad1504678..7801623fec9 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -73,8 +73,8 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; static void usage(void); static void StreamLogicalLog(void); -static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); -static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, +static bool flushAndSendFeedback(PGconn *myconn, TimestampTz *now); +static void prepareToTerminate(PGconn *myconn, XLogRecPtr endpos, StreamStopReason reason, XLogRecPtr lsn); @@ -126,7 +126,7 @@ usage(void) * Send a Standby Status Update message to server. */ static bool -sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) +sendFeedback(PGconn *myconn, TimestampTz now, bool force, bool replyRequested) { static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; @@ -167,10 +167,10 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) last_written_lsn = output_written_lsn; last_fsync_lsn = output_fsync_lsn; - if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) + if (PQputCopyData(myconn, replybuf, len) <= 0 || PQflush(myconn)) { pg_log_error("could not send feedback packet: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); return false; } @@ -1045,13 +1045,13 @@ main(int argc, char **argv) * feedback. */ static bool -flushAndSendFeedback(PGconn *conn, TimestampTz *now) +flushAndSendFeedback(PGconn *myconn, TimestampTz *now) { /* flush data to disk, so that we send a recent flush pointer */ if (!OutputFsync(*now)) return false; *now = feGetCurrentTimestamp(); - if (!sendFeedback(conn, *now, true, false)) + if (!sendFeedback(myconn, *now, true, false)) return false; return true; @@ -1062,11 +1062,11 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now) * retry on failure. */ static void -prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, +prepareToTerminate(PGconn *myconn, XLogRecPtr end_pos, StreamStopReason reason, XLogRecPtr lsn) { - (void) PQputCopyEnd(conn, NULL); - (void) PQflush(conn); + (void) PQputCopyEnd(myconn, NULL); + (void) PQflush(myconn); if (verbose) { @@ -1077,12 +1077,12 @@ prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, break; case STREAM_STOP_KEEPALIVE: pg_log_info("end position %X/%08X reached by keepalive", - LSN_FORMAT_ARGS(endpos)); + LSN_FORMAT_ARGS(end_pos)); break; case STREAM_STOP_END_OF_WAL: Assert(XLogRecPtrIsValid(lsn)); pg_log_info("end position %X/%08X reached by WAL record at %X/%08X", - LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn)); + LSN_FORMAT_ARGS(end_pos), LSN_FORMAT_ARGS(lsn)); break; case STREAM_STOP_NONE: Assert(false); diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 25b13c7f55c..9ec1eee088b 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -32,18 +32,18 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static bool still_sending = true; /* feedback still needs to be sent? */ -static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, +static PGresult *HandleCopyStream(PGconn *myconn, StreamCtl *stream, XLogRecPtr *stoppos); -static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket); -static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +static int CopyStreamPoll(PGconn *myconn, long timeout_ms, pgsocket stop_socket); +static int CopyStreamReceive(PGconn *myconn, long timeout, pgsocket stop_socket, char **buffer); -static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, +static bool ProcessKeepaliveMsg(PGconn *myconn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); -static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, +static bool ProcessWALDataMsg(PGconn *myconn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos); -static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, +static PGresult *HandleEndOfCopyStream(PGconn *myconn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos); -static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos); +static bool CheckCopyStreamStop(PGconn *myconn, StreamCtl *stream, XLogRecPtr blockpos); static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status); @@ -334,7 +334,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) * Send a Standby Status Update message to server. */ static bool -sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested) +sendFeedback(PGconn *myconn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested) { char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; @@ -355,10 +355,10 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; - if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) + if (PQputCopyData(myconn, replybuf, len) <= 0 || PQflush(myconn)) { pg_log_error("could not send feedback packet: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); return false; } @@ -372,7 +372,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque * If it's not, an error message is printed to stderr, and false is returned. */ bool -CheckServerVersionForStreaming(PGconn *conn) +CheckServerVersionForStreaming(PGconn *myconn) { int minServerMajor, maxServerMajor; @@ -386,10 +386,10 @@ CheckServerVersionForStreaming(PGconn *conn) */ minServerMajor = 903; maxServerMajor = PG_VERSION_NUM / 100; - serverMajor = PQserverVersion(conn) / 100; + serverMajor = PQserverVersion(myconn) / 100; if (serverMajor < minServerMajor) { - const char *serverver = PQparameterStatus(conn, "server_version"); + const char *serverver = PQparameterStatus(myconn, "server_version"); pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s", serverver ? serverver : "'unknown'", @@ -398,7 +398,7 @@ CheckServerVersionForStreaming(PGconn *conn) } else if (serverMajor > maxServerMajor) { - const char *serverver = PQparameterStatus(conn, "server_version"); + const char *serverver = PQparameterStatus(myconn, "server_version"); pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s", serverver ? serverver : "'unknown'", @@ -450,7 +450,7 @@ CheckServerVersionForStreaming(PGconn *conn) * Note: The WAL location *must* be at a log segment start! */ bool -ReceiveXlogStream(PGconn *conn, StreamCtl *stream) +ReceiveXlogStream(PGconn *myconn, StreamCtl *stream) { char query[128]; char slotcmd[128]; @@ -461,7 +461,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) * The caller should've checked the server version already, but doesn't do * any harm to check it here too. */ - if (!CheckServerVersionForStreaming(conn)) + if (!CheckServerVersionForStreaming(myconn)) return false; /* @@ -497,7 +497,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) /* * Get the server system identifier and timeline, and validate them. */ - if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL)) + if (!RunIdentifySystem(myconn, &sysidentifier, &servertli, NULL, NULL)) { pg_free(sysidentifier); return false; @@ -536,7 +536,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) if (!existsTimeLineHistoryFile(stream)) { snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline); - res = PQexec(conn, query); + res = PQexec(myconn, query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { /* FIXME: we might send it ok, but get an error */ @@ -576,7 +576,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) slotcmd, LSN_FORMAT_ARGS(stream->startpos), stream->timeline); - res = PQexec(conn, query); + res = PQexec(myconn, query); if (PQresultStatus(res) != PGRES_COPY_BOTH) { pg_log_error("could not send replication command \"%s\": %s", @@ -587,7 +587,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) PQclear(res); /* Stream the WAL */ - res = HandleCopyStream(conn, stream, &stoppos); + res = HandleCopyStream(myconn, stream, &stoppos); if (res == NULL) goto error; @@ -636,7 +636,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) } /* Read the final result, which should be CommandComplete. */ - res = PQgetResult(conn); + res = PQgetResult(myconn); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pg_log_error("unexpected termination of replication stream: %s", @@ -742,7 +742,7 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) * On any other sort of error, returns NULL. */ static PGresult * -HandleCopyStream(PGconn *conn, StreamCtl *stream, +HandleCopyStream(PGconn *myconn, StreamCtl *stream, XLogRecPtr *stoppos) { char *copybuf = NULL; @@ -760,7 +760,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, /* * Check if we should continue streaming, or abort at this point. */ - if (!CheckCopyStreamStop(conn, stream, blockpos)) + if (!CheckCopyStreamStop(myconn, stream, blockpos)) goto error; now = feGetCurrentTimestamp(); @@ -780,7 +780,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * Send feedback so that the server sees the latest WAL locations * immediately. */ - if (!sendFeedback(conn, blockpos, now, false)) + if (!sendFeedback(myconn, blockpos, now, false)) goto error; last_status = now; } @@ -793,7 +793,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, stream->standby_message_timeout)) { /* Time to send feedback! */ - if (!sendFeedback(conn, blockpos, now, false)) + if (!sendFeedback(myconn, blockpos, now, false)) goto error; last_status = now; } @@ -808,14 +808,14 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, PQfreemem(copybuf); copybuf = NULL; - r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf); + r = CopyStreamReceive(myconn, sleeptime, stream->stop_socket, ©buf); while (r != 0) { if (r == -1) goto error; if (r == -2) { - PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos); + PGresult *res = HandleEndOfCopyStream(myconn, stream, copybuf, blockpos, stoppos); if (res == NULL) goto error; @@ -826,20 +826,20 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, /* Check the message type. */ if (copybuf[0] == PqReplMsg_Keepalive) { - if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos, + if (!ProcessKeepaliveMsg(myconn, stream, copybuf, r, blockpos, &last_status)) goto error; } else if (copybuf[0] == PqReplMsg_WALData) { - if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos)) + if (!ProcessWALDataMsg(myconn, stream, copybuf, r, &blockpos)) goto error; /* * Check if we should continue streaming, or abort at this * point. */ - if (!CheckCopyStreamStop(conn, stream, blockpos)) + if (!CheckCopyStreamStop(myconn, stream, blockpos)) goto error; } else @@ -857,7 +857,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * Process the received data, and any subsequent data we can read * without blocking. */ - r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf); + r = CopyStreamReceive(myconn, 0, stream->stop_socket, ©buf); } } @@ -875,7 +875,7 @@ error: * or interrupted by signal or stop_socket input, and -1 on an error. */ static int -CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) +CopyStreamPoll(PGconn *myconn, long timeout_ms, pgsocket stop_socket) { int ret; fd_set input_mask; @@ -884,10 +884,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) struct timeval timeout; struct timeval *timeoutptr; - connsocket = PQsocket(conn); + connsocket = PQsocket(myconn); if (connsocket < 0) { - pg_log_error("invalid socket: %s", PQerrorMessage(conn)); + pg_log_error("invalid socket: %s", PQerrorMessage(myconn)); return -1; } @@ -937,7 +937,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) * -1 on error. -2 if the server ended the COPY. */ static int -CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +CopyStreamReceive(PGconn *myconn, long timeout, pgsocket stop_socket, char **buffer) { char *copybuf = NULL; @@ -947,7 +947,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, Assert(*buffer == NULL); /* Try to receive a CopyData message */ - rawlen = PQgetCopyData(conn, ©buf, 1); + rawlen = PQgetCopyData(myconn, ©buf, 1); if (rawlen == 0) { int ret; @@ -957,20 +957,20 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, * the specified timeout, so that we can ping the server. Also stop * waiting if input appears on stop_socket. */ - ret = CopyStreamPoll(conn, timeout, stop_socket); + ret = CopyStreamPoll(myconn, timeout, stop_socket); if (ret <= 0) return ret; /* Now there is actually data on the socket */ - if (PQconsumeInput(conn) == 0) + if (PQconsumeInput(myconn) == 0) { pg_log_error("could not receive data from WAL stream: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); return -1; } /* Now that we've consumed some input, try again */ - rawlen = PQgetCopyData(conn, ©buf, 1); + rawlen = PQgetCopyData(myconn, ©buf, 1); if (rawlen == 0) return 0; } @@ -978,7 +978,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, return -2; if (rawlen == -2) { - pg_log_error("could not read COPY data: %s", PQerrorMessage(conn)); + pg_log_error("could not read COPY data: %s", PQerrorMessage(myconn)); return -1; } @@ -991,7 +991,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, * Process the keepalive message. */ static bool -ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, +ProcessKeepaliveMsg(PGconn *myconn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status) { int pos; @@ -1033,7 +1033,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, } now = feGetCurrentTimestamp(); - if (!sendFeedback(conn, blockpos, now, false)) + if (!sendFeedback(myconn, blockpos, now, false)) return false; *last_status = now; } @@ -1045,7 +1045,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * Process WALData message. */ static bool -ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, +ProcessWALDataMsg(PGconn *myconn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos) { int xlogoff; @@ -1156,10 +1156,10 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true)) { - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + if (PQputCopyEnd(myconn, NULL) <= 0 || PQflush(myconn)) { pg_log_error("could not send copy-end packet: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); return false; } still_sending = false; @@ -1176,10 +1176,10 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * Handle end of the copy stream. */ static PGresult * -HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, +HandleEndOfCopyStream(PGconn *myconn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos) { - PGresult *res = PQgetResult(conn); + PGresult *res = PQgetResult(myconn); /* * The server closed its end of the copy stream. If we haven't closed @@ -1196,14 +1196,14 @@ HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, } if (PQresultStatus(res) == PGRES_COPY_IN) { - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + if (PQputCopyEnd(myconn, NULL) <= 0 || PQflush(myconn)) { pg_log_error("could not send copy-end packet: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); PQclear(res); return NULL; } - res = PQgetResult(conn); + res = PQgetResult(myconn); } still_sending = false; } @@ -1215,7 +1215,7 @@ HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, * Check if we should continue streaming, or abort at this point. */ static bool -CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos) +CheckCopyStreamStop(PGconn *myconn, StreamCtl *stream, XLogRecPtr blockpos) { if (still_sending && stream->stream_stop(blockpos, stream->timeline, false)) { @@ -1224,10 +1224,10 @@ CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos) /* Potential error message is written by close_walfile */ return false; } - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + if (PQputCopyEnd(myconn, NULL) <= 0 || PQflush(myconn)) { pg_log_error("could not send copy-end packet: %s", - PQerrorMessage(conn)); + PQerrorMessage(myconn)); return false; } still_sending = false; diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index e5a7cb6e5b1..342ad2cbd4f 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -31,7 +31,7 @@ int WalSegSz; -static bool RetrieveDataDirCreatePerm(PGconn *conn); +static bool RetrieveDataDirCreatePerm(PGconn *myconn); /* SHOW command for replication connection was introduced in version 10 */ #define MINIMUM_VERSION_FOR_SHOW_CMD 100000 @@ -273,7 +273,7 @@ GetConnection(void) * since ControlFile is not accessible here. */ bool -RetrieveWalSegSize(PGconn *conn) +RetrieveWalSegSize(PGconn *myconn) { PGresult *res; char xlog_unit[3]; @@ -281,20 +281,20 @@ RetrieveWalSegSize(PGconn *conn) multiplier = 1; /* check connection existence */ - Assert(conn != NULL); + Assert(myconn != NULL); /* for previous versions set the default xlog seg size */ - if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD) + if (PQserverVersion(myconn) < MINIMUM_VERSION_FOR_SHOW_CMD) { WalSegSz = DEFAULT_XLOG_SEG_SIZE; return true; } - res = PQexec(conn, "SHOW wal_segment_size"); + res = PQexec(myconn, "SHOW wal_segment_size"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not send replication command \"%s\": %s", - "SHOW wal_segment_size", PQerrorMessage(conn)); + "SHOW wal_segment_size", PQerrorMessage(myconn)); PQclear(res); return false; @@ -352,23 +352,23 @@ RetrieveWalSegSize(PGconn *conn) * on the data directory. */ static bool -RetrieveDataDirCreatePerm(PGconn *conn) +RetrieveDataDirCreatePerm(PGconn *myconn) { PGresult *res; int data_directory_mode; /* check connection existence */ - Assert(conn != NULL); + Assert(myconn != NULL); /* for previous versions leave the default group access */ - if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS) + if (PQserverVersion(myconn) < MINIMUM_VERSION_FOR_GROUP_ACCESS) return true; - res = PQexec(conn, "SHOW data_directory_mode"); + res = PQexec(myconn, "SHOW data_directory_mode"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not send replication command \"%s\": %s", - "SHOW data_directory_mode", PQerrorMessage(conn)); + "SHOW data_directory_mode", PQerrorMessage(myconn)); PQclear(res); return false; @@ -406,7 +406,7 @@ RetrieveDataDirCreatePerm(PGconn *conn) * - Database name (NULL in servers prior to 9.4) */ bool -RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, +RunIdentifySystem(PGconn *myconn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name) { PGresult *res; @@ -414,13 +414,13 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, lo; /* Check connection existence */ - Assert(conn != NULL); + Assert(myconn != NULL); - res = PQexec(conn, "IDENTIFY_SYSTEM"); + res = PQexec(myconn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not send replication command \"%s\": %s", - "IDENTIFY_SYSTEM", PQerrorMessage(conn)); + "IDENTIFY_SYSTEM", PQerrorMessage(myconn)); PQclear(res); return false; @@ -460,7 +460,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, if (db_name != NULL) { *db_name = NULL; - if (PQserverVersion(conn) >= 90400) + if (PQserverVersion(myconn) >= 90400) { if (PQnfields(res) < 4) { @@ -487,7 +487,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, * Returns false on failure, and true otherwise. */ bool -GetSlotInformation(PGconn *conn, const char *slot_name, +GetSlotInformation(PGconn *myconn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli) { PGresult *res; @@ -502,13 +502,13 @@ GetSlotInformation(PGconn *conn, const char *slot_name, query = createPQExpBuffer(); appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name); - res = PQexec(conn, query->data); + res = PQexec(myconn, query->data); destroyPQExpBuffer(query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not send replication command \"%s\": %s", - "READ_REPLICATION_SLOT", PQerrorMessage(conn)); + "READ_REPLICATION_SLOT", PQerrorMessage(myconn)); PQclear(res); return false; } @@ -581,13 +581,13 @@ GetSlotInformation(PGconn *conn, const char *slot_name, * returns true in case of success. */ bool -CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, +CreateReplicationSlot(PGconn *myconn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, bool slot_exists_ok, bool two_phase, bool failover) { PQExpBuffer query; PGresult *res; - bool use_new_option_syntax = (PQserverVersion(conn) >= 150000); + bool use_new_option_syntax = (PQserverVersion(myconn) >= 150000); query = createPQExpBuffer(); @@ -617,15 +617,15 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, } else { - if (failover && PQserverVersion(conn) >= 170000) + if (failover && PQserverVersion(myconn) >= 170000) AppendPlainCommandOption(query, use_new_option_syntax, "FAILOVER"); - if (two_phase && PQserverVersion(conn) >= 150000) + if (two_phase && PQserverVersion(myconn) >= 150000) AppendPlainCommandOption(query, use_new_option_syntax, "TWO_PHASE"); - if (PQserverVersion(conn) >= 100000) + if (PQserverVersion(myconn) >= 100000) { /* pg_recvlogical doesn't use an exported snapshot, so suppress */ if (use_new_option_syntax) @@ -649,7 +649,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, } /* Now run the query */ - res = PQexec(conn, query->data); + res = PQexec(myconn, query->data); if (PQresultStatus(res) != PGRES_TUPLES_OK) { const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); @@ -665,7 +665,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, else { pg_log_error("could not send replication command \"%s\": %s", - query->data, PQerrorMessage(conn)); + query->data, PQerrorMessage(myconn)); destroyPQExpBuffer(query); PQclear(res); @@ -694,7 +694,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, * returns true in case of success. */ bool -DropReplicationSlot(PGconn *conn, const char *slot_name) +DropReplicationSlot(PGconn *myconn, const char *slot_name) { PQExpBuffer query; PGresult *res; @@ -706,11 +706,11 @@ DropReplicationSlot(PGconn *conn, const char *slot_name) /* Build query */ appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slot_name); - res = PQexec(conn, query->data); + res = PQexec(myconn, query->data); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pg_log_error("could not send replication command \"%s\": %s", - query->data, PQerrorMessage(conn)); + query->data, PQerrorMessage(myconn)); destroyPQExpBuffer(query); PQclear(res); -- 2.39.5 (Apple Git-154)