*** a/doc/src/sgml/protocol.sgml --- b/doc/src/sgml/protocol.sgml *************** *** 1039,1044 **** --- 1039,1057 ---- all columns in a given COPY operation will use the same format, but the message design does not assume this.) + + + There is another Copy-related mode called Copy-both, which allows + high-speed bulk data transfer to and from the server. + Copy-both mode is initiated when the backend with walsender mode + executes a START_REPLICATION statement. The + backend sends a CopyBothResponse message to the frontend. Both + the backend and the frontend should then send CopyData messages + between each other until the connection is terminated. Copy-both + mode is used by streaming replication (see ). + + *************** *** 1344,1350 **** The commands accepted in walsender mode are: WAL position XXX/XXX. The server can reply with an error, e.g. if the requested section of WAL has already been recycled. On success, server responds with a ! CopyOutResponse message, and then starts to stream WAL to the frontend. WAL will continue to be streamed until the connection is broken; no further commands will be accepted. --- 1357,1363 ---- WAL position XXX/XXX. The server can reply with an error, e.g. if the requested section of WAL has already been recycled. On success, server responds with a ! CopyBothResponse message, and then starts to stream WAL to the frontend. WAL will continue to be streamed until the connection is broken; no further commands will be accepted. *************** *** 2696,2701 **** CopyOutResponse (B) --- 2709,2750 ---- + CopyBothResponse (B) + + + + + + + + Byte1('W') + + + + Identifies the message as a Start Copy Both response. + This message is used only for Streaming Replication. + + + + + + Int32 + + + + Length of message contents in bytes, including self. + + + + + + + + + + + + DataRow (B) *** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c --- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c *************** *** 50,55 **** static char *recvBuf = NULL; --- 50,56 ---- static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len); + static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ *************** *** 64,73 **** _PG_init(void) { /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_receive != NULL || ! walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_receive = libpqrcv_receive; walrcv_disconnect = libpqrcv_disconnect; } --- 65,75 ---- { /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_receive != NULL || ! walrcv_send != NULL || walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_receive = libpqrcv_receive; + walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; } *************** *** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) --- 400,417 ---- return true; } + + /* + * Send a message to XLOG stream. + * + * ereports on error. + */ + static void + libpqrcv_send(const char *buffer, int nbytes) + { + if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || + PQflush(streamConn)) + ereport(ERROR, + (errmsg("could not send data to WAL stream: %s", + PQerrorMessage(streamConn)))); + } *** a/src/backend/replication/walreceiver.c --- b/src/backend/replication/walreceiver.c *************** *** 57,62 **** bool am_walreceiver; --- 57,63 ---- /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; walrcv_receive_type walrcv_receive = NULL; + walrcv_send_type walrcv_send = NULL; walrcv_disconnect_type walrcv_disconnect = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ *************** *** 247,253 **** WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_receive == NULL || ! walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* --- 248,254 ---- /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_receive == NULL || ! walrcv_send == NULL || walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 287,296 **** WalSndHandshake(void) (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); ! /* Send a CopyOutResponse message, and start streaming */ ! pq_beginmessage(&buf, 'H'); ! pq_sendbyte(&buf, 0); ! pq_sendint(&buf, 0, 2); pq_endmessage(&buf); pq_flush(); --- 287,294 ---- (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); ! /* Send a CopyBothResponse message, and start streaming */ ! pq_beginmessage(&buf, 'W'); pq_endmessage(&buf); pq_flush(); *** a/src/include/replication/walreceiver.h --- b/src/include/replication/walreceiver.h *************** *** 84,89 **** typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, --- 84,92 ---- char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; + typedef void (*walrcv_send_type) (const char *buffer, int nbytes); + extern PGDLLIMPORT walrcv_send_type walrcv_send; + typedef void (*walrcv_disconnect_type) (void); extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; *** a/src/interfaces/libpq/fe-exec.c --- b/src/interfaces/libpq/fe-exec.c *************** *** 1586,1591 **** PQgetResult(PGconn *conn) --- 1586,1592 ---- res = PQmakeEmptyPGresult(conn, PGRES_COPY_IN); break; case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: if (conn->result && conn->result->resultStatus == PGRES_COPY_OUT) res = pqPrepareAsyncResult(conn); else *************** *** 2000,2006 **** PQnotifies(PGconn *conn) } /* ! * PQputCopyData - send some data to the backend during COPY IN * * Returns 1 if successful, 0 if data could not be sent (only possible * in nonblock mode), or -1 if an error occurs. --- 2001,2007 ---- } /* ! * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH * * Returns 1 if successful, 0 if data could not be sent (only possible * in nonblock mode), or -1 if an error occurs. *************** *** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes) { if (!conn) return -1; ! if (conn->asyncStatus != PGASYNC_COPY_IN) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); --- 2011,2018 ---- { if (!conn) return -1; ! if (conn->asyncStatus != PGASYNC_COPY_IN && ! conn->asyncStatus != PGASYNC_COPY_BOTH) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); *************** *** 2148,2153 **** PQputCopyEnd(PGconn *conn, const char *errormsg) --- 2150,2156 ---- /* * PQgetCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH * * If successful, sets *buffer to point to a malloc'd row of data, and * returns row length (always > 0) as result. *************** *** 2161,2167 **** PQgetCopyData(PGconn *conn, char **buffer, int async) *buffer = NULL; /* for all failure cases */ if (!conn) return -2; ! if (conn->asyncStatus != PGASYNC_COPY_OUT) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); --- 2164,2171 ---- *buffer = NULL; /* for all failure cases */ if (!conn) return -2; ! if (conn->asyncStatus != PGASYNC_COPY_OUT && ! conn->asyncStatus != PGASYNC_COPY_BOTH) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); *** a/src/interfaces/libpq/fe-protocol2.c --- b/src/interfaces/libpq/fe-protocol2.c *************** *** 541,546 **** pqParseInput2(PGconn *conn) --- 541,550 ---- case 'H': /* Start Copy Out */ conn->asyncStatus = PGASYNC_COPY_OUT; break; + /* + * Don't need to process CopyBothResponse here because + * it never arrives from the server during protocol 2.0. + */ default: printfPQExpBuffer(&conn->errorMessage, libpq_gettext( *** a/src/interfaces/libpq/fe-protocol3.c --- b/src/interfaces/libpq/fe-protocol3.c *************** *** 358,363 **** pqParseInput3(PGconn *conn) --- 358,375 ---- conn->asyncStatus = PGASYNC_COPY_OUT; conn->copy_already_done = 0; break; + case 'W': /* Start Copy Both */ + /* + * We don't need to use getCopyStart here since CopyBothResponse + * specifies neither the copy format nor the number of columns in + * the Copy data. They should be always zero. + */ + conn->result = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT); + if (!conn->result) + return; + conn->asyncStatus = PGASYNC_COPY_BOTH; + conn->copy_already_done = 0; + break; case 'd': /* Copy Data */ /* *** a/src/interfaces/libpq/libpq-int.h --- b/src/interfaces/libpq/libpq-int.h *************** *** 218,224 **** typedef enum PGASYNC_BUSY, /* query in progress */ PGASYNC_READY, /* result ready for PQgetResult */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ ! PGASYNC_COPY_OUT /* Copy Out data transfer in progress */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */ --- 218,225 ---- PGASYNC_BUSY, /* query in progress */ PGASYNC_READY, /* result ready for PQgetResult */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ ! PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ ! PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */