[PATCH 12/16] Add state to keep track of logical replication - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | [PATCH 12/16] Add state to keep track of logical replication |
Date | |
Msg-id | 1339586927-13156-12-git-send-email-andres@2ndquadrant.com Whole thread Raw |
In response to | [RFC][PATCH] Logical Replication/BDR prototype and architecture (Andres Freund <andres@2ndquadrant.com>) |
List | pgsql-hackers |
From: Andres Freund <andres@anarazel.de> In order to have restartable replication with minimal additional writes its very useful to know up to which point we have replayed/received changes from a foreign node. One representation of that is the lsn of changes at the originating cluster. We need to keep track of the point up to which we received data and up to where we applied data. For that we added a field 'origin_lsn' to commit records. This allows to keep track of the apply position with crash recovery with minimal additional io. We only added the field to non-compact commit records to reduce the overhead in case logical replication is not used. Checkpoints need to keep track of the apply/receive positions as well because otherwise it would be hard to determine the lsn from where to restart receive/apply after a shutdown/crash if no changes happened since the last shutdown/crash. While running the startup process, the walreceiver and a (future) apply process will need a coherent picture those two states so add shared memory state to keep track of it. Currently this is represented in the walreceivers shared memory segment. This will likely need to change. During crash recovery/physical replication the origin_lsn field of commit records is used to update the shared memory, and thus the next checkpoint's, notion of the apply state. Missing: - For correct crash recovery we need more state than the 'apply lsn' because transactions on the originating side can overlap.At the lsn we just applied many other transaction can be in-progres. To correctly handle that we need to keep trackof oldest start lsn of any transaction currently being reassembled (c.f. ApplyCache). Then we can start to reassemblethe ApplyCache up from that point and throw away any transaction which comitted before the recorded/recovered applylsn. It should be sufficient to store that knowledge in shared memory and checkpoint records. ---src/backend/access/transam/xact.c | 22 ++++++++-src/backend/access/transam/xlog.c | 73 ++++++++++++++++++++++++++++src/backend/replication/walreceiverfuncs.c| 8 +++src/include/access/xact.h | 1 +src/include/catalog/pg_control.h | 13 ++++-src/include/replication/walreceiver.h | 13 +++++6files changed, 128 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index dc30a17..40ac965 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -39,11 +39,13 @@#include "replication/logical.h"#include "replication/syncrep.h"#include "replication/walsender.h" +#include "replication/walreceiver.h"#include "storage/lmgr.h"#include "storage/predicate.h"#include "storage/procarray.h"#include"storage/sinvaladt.h"#include "storage/smgr.h" +#include "storage/spin.h"#include "utils/combocid.h"#include "utils/guc.h"#include "utils/inval.h" @@ -1015,7 +1017,8 @@ RecordTransactionCommit(void) /* * Do we need the long commit record? If not, use thecompact format. */ - if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit) + if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit || + (wal_level == WAL_LEVEL_LOGICAL && current_replication_origin_id != guc_replication_origin_id)) { XLogRecData rdata[4]; int lastrdata = 0; @@ -1037,6 +1040,8 @@ RecordTransactionCommit(void) xlrec.nrels = nrels; xlrec.nsubxacts = nchildren; xlrec.nmsgs = nmsgs; + xlrec.origin_lsn = current_replication_origin_lsn; + rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactCommit; rdata[0].buffer= InvalidBuffer; @@ -4575,6 +4580,21 @@ xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node, LWLockRelease(XidGenLock); } + /* + * record where were at wrt to recovery. We need that to know from where on + * to restart applying logical change records + */ + if(LogicalWalReceiverActive() && !XLByteEQ(origin_lsn, zeroRecPtr)) + { + /* + * probably we don't need the locking because no lcr receiver can run + * yet. + */ + SpinLockAcquire(&WalRcv->mutex); + WalRcv->mm_applyState[originating_node] = origin_lsn; + SpinLockRelease(&WalRcv->mutex); + } + if (standbyState == STANDBY_DISABLED) { /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0622726..20a4611 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5183,6 +5183,7 @@ BootStrapXLOG(void) uint64 sysidentifier; struct timeval tv; pg_crc32 crc; + int i; /* * Select a hopefully-unique system identifier code for this installation. @@ -5229,6 +5230,13 @@ BootStrapXLOG(void) checkPoint.time = (pg_time_t) time(NULL); checkPoint.oldestActiveXid = InvalidTransactionId; + for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId; + i++){ + checkPoint.logicalReceiveState[i] = zeroRecPtr; + checkPoint.logicalApplyState[i] = zeroRecPtr; + } + + ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount= 0; @@ -6314,6 +6322,53 @@ StartupXLOG(void) InRecovery = true; } + /* + * setup shared memory state for logical wal receiver + * + * Do this unconditionally so enabling/disabling/enabling logical replay + * doesn't loose information due to rewriting pg_control + */ + { + int i; + + Assert(WalRcv); + /* locking is not really required here afaics, but ... */ + SpinLockAcquire(&WalRcv->mutex); + + for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1; + i++) + { + XLogRecPtr* receiveState = &ControlFile->checkPointCopy.logicalReceiveState[i]; + XLogRecPtr* applyState = &ControlFile->checkPointCopy.logicalApplyState[i]; + if(i == guc_replication_origin_id && ( + !XLByteEQ(*receiveState, zeroRecPtr) || + !XLByteEQ(*applyState, zeroRecPtr)) + ) + { + elog(WARNING, "logical recovery state for own db. apply: %X/%X, receive %X/%X, origin %d", + applyState->xlogid, applyState->xrecoff, + receiveState->xlogid, receiveState->xrecoff, + guc_replication_origin_id); + WalRcv->mm_receiveState[i] = zeroRecPtr; + WalRcv->mm_applyState[i] = zeroRecPtr; + } + else{ + WalRcv->mm_receiveState[i] = *receiveState; + WalRcv->mm_applyState[i] = *applyState; + } + } + SpinLockRelease(&WalRcv->mutex); + + /* FIXME: remove at some point */ + for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1; + i++){ + elog(LOG, "restored apply state for node %d to %X/%X, receive %X/%X", + i, + WalRcv->mm_applyState[i].xlogid, WalRcv->mm_applyState[i].xrecoff, + WalRcv->mm_receiveState[i].xlogid, WalRcv->mm_receiveState[i].xrecoff); + } + } + /* REDO */ if (InRecovery) { @@ -7906,6 +7961,24 @@ CreateCheckPoint(int flags) &checkPoint.nextMultiOffset); /* + * fill out where are at wrt logical replay. Do this unconditionally so we + * don't loose information due to rewriting pg_control when toggling + * logical replay + */ + { + int i; + SpinLockAcquire(&WalRcv->mutex); + + for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1; + i++){ + checkPoint.logicalApplyState[i] = WalRcv->mm_applyState[i]; + checkPoint.logicalReceiveState[i] = WalRcv->mm_receiveState[i]; + } + SpinLockRelease(&WalRcv->mutex); + elog(LOG, "updated logical checkpoint data"); + } + + /* * Having constructed the checkpoint record, ensure all shmem disk buffers * and commit-log buffers are flushedto disk. * diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 876196f..cb49282 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -64,6 +64,14 @@ WalRcvShmemInit(void) MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); + + memset(&WalRcv->mm_receiveState, + 0, sizeof(WalRcv->mm_receiveState)); + memset(&WalRcv->mm_applyState, + 0, sizeof(WalRcv->mm_applyState)); + + memset(&WalRcv->mm_receiveLatch, + 0, sizeof(WalRcv->mm_receiveLatch)); }} diff --git a/src/include/access/xact.h b/src/include/access/xact.h index b12d2a0..2757782 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -137,6 +137,7 @@ typedef struct xl_xact_commit int nmsgs; /* number of shared inval msgs */ Oid dbId; /* MyDatabaseId */ Oid tsId; /* MyDatabaseTableSpace */ + XLogRecPtr origin_lsn; /* location of originating commit */ /* Array of RelFileNode(s) to drop at commit */ RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */ /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 5cff396..bc6316e 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -16,12 +16,13 @@#define PG_CONTROL_H#include "access/xlogdefs.h" +#include "replication/logical.h"#include "pgtime.h" /* for pg_time_t */#include "utils/pg_crc.h"/* Versionidentifier for this pg_control format */ -#define PG_CONTROL_VERSION 922 +#define PG_CONTROL_VERSION 923/* * Body of CheckPoint XLOG records. This is declared here because we keep @@ -50,6 +51,13 @@ typedef struct CheckPoint * it's set to InvalidTransactionId. */ TransactionId oldestActiveXid; + + /* + * The replay state from every other node. This is only needed if wal_level + * >= logical and thus is only filled then. + */ + XLogRecPtr logicalApplyState[MaxMultimasterNodeId - 1]; + XLogRecPtr logicalReceiveState[MaxMultimasterNodeId - 1];} CheckPoint;/* XLOG info values for XLOG rmgr */ @@ -85,6 +93,9 @@ typedef enum DBState * NOTE: try to keep this under 512 bytes so that it will fit on one physical * sectorof typical disk drives. This reduces the odds of corruption due to * power failure midway through a write. + * + * FIXME: in order to allow many nodes in mm (which increases checkpoint size) + * we should change the writing of this to write(temp_file);fsync();rename();fsync(); */typedef struct ControlFileData diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index d21ec94..c9ab1be 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -14,6 +14,8 @@#include "access/xlog.h"#include "access/xlogdefs.h" +#include "replication/logical.h" +#include "storage/latch.h"#include "storage/spin.h"#include "pgtime.h" @@ -90,6 +92,17 @@ typedef struct char conninfo[MAXCONNINFO]; slock_t mutex; /* locks sharedvariables shown above */ + + /* + * replay point up to which we replayed for every node + * XXX: should possibly be dynamically sized? + * FIXME: should go to its own shm segment? + */ + XLogRecPtr mm_receiveState[MaxMultimasterNodeId - 1]; + XLogRecPtr mm_applyState[MaxMultimasterNodeId - 1]; + + Latch* mm_receiveLatch[MaxMultimasterNodeId - 1]; +} WalRcvData;extern WalRcvData *WalRcv; -- 1.7.10.rc3.3.g19a6c.dirty
pgsql-hackers by date:
Previous
From: Andres FreundDate:
Subject: [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level
Next
From: Andres FreundDate:
Subject: [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes