Re: Reducing memory consumption for pending inval messages - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: Reducing memory consumption for pending inval messages |
Date | |
Msg-id | 3380133.1626207700@sss.pgh.pa.us Whole thread Raw |
In response to | Reducing memory consumption for pending inval messages (Tom Lane <tgl@sss.pgh.pa.us>) |
Responses |
Re: Reducing memory consumption for pending inval messages
|
List | pgsql-hackers |
I wrote: > It turns out that the existing implementation in inval.c is quite > inefficient when a lot of individual commands each register just > a few invalidations --- but a few invalidations per command is > pretty typical. Per the cfbot, here's a rebase over 3788c6678 (actually just undoing its effects on inval.c, since that code is removed here). regards, tom lane diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 9c79775725..9352c68090 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -71,11 +71,6 @@ * manipulating the init file is in relcache.c, but we keep track of the * need for it here. * - * The request lists proper are kept in CurTransactionContext of their - * creating (sub)transaction, since they can be forgotten on abort of that - * transaction but must be kept till top-level commit otherwise. For - * simplicity we keep the controlling list-of-lists in TopTransactionContext. - * * Currently, inval messages are sent without regard for the possibility * that the object described by the catalog tuple might be a session-local * object such as a temporary table. This is because (1) this code has @@ -106,7 +101,6 @@ #include "catalog/catalog.h" #include "catalog/pg_constraint.h" #include "miscadmin.h" -#include "port/pg_bitutils.h" #include "storage/sinval.h" #include "storage/smgr.h" #include "utils/catcache.h" @@ -121,35 +115,86 @@ /* - * To minimize palloc traffic, we keep pending requests in successively- - * larger chunks (a slightly more sophisticated version of an expansible - * array). All request types can be stored as SharedInvalidationMessage - * records. The ordering of requests within a list is never significant. + * Pending requests are stored as ready-to-send SharedInvalidationMessages. + * We keep the messages themselves in arrays in TopTransactionContext + * (there are separate arrays for catcache and relcache messages). Control + * information is kept in a chain of TransInvalidationInfo structs, also + * allocated in TopTransactionContext. (We could keep a subtransaction's + * TransInvalidationInfo in its CurTransactionContext; but that's more + * wasteful not less so, since in very many scenarios it'd be the only + * allocation in the subtransaction's CurTransactionContext.) + * + * We can store the message arrays densely, and yet avoid moving data around + * within an array, because within any one subtransaction we need only + * distinguish between messages emitted by prior commands and those emitted + * by the current command. Once a command completes and we've done local + * processing on its messages, we can fold those into the prior-commands + * messages just by changing array indexes in the TransInvalidationInfo + * struct. Similarly, we need distinguish messages of prior subtransactions + * from those of the current subtransaction only until the subtransaction + * completes, after which we adjust the array indexes in the parent's + * TransInvalidationInfo to include the subtransaction's messages. + * + * The ordering of the individual messages within a command's or + * subtransaction's output is not considered significant, although this + * implementation happens to preserve the order in which they were queued. + * (Previous versions of this code did not preserve it.) + * + * For notational convenience, control information is kept in two-element + * arrays, the first for catcache messages and the second for relcache + * messages. */ -typedef struct InvalidationChunk +#define CatCacheMsgs 0 +#define RelCacheMsgs 1 + +/* Pointers to main arrays in TopTransactionContext */ +typedef struct InvalMessageArray { - struct InvalidationChunk *next; /* list link */ - int nitems; /* # items currently stored in chunk */ - int maxitems; /* size of allocated array in this chunk */ - SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]; -} InvalidationChunk; + SharedInvalidationMessage *msgs; /* palloc'd array (can be expanded) */ + int maxmsgs; /* current allocated size of array */ +} InvalMessageArray; -typedef struct InvalidationListHeader +static InvalMessageArray InvalMessageArrays[2]; + +/* Control information for one logical group of messages */ +typedef struct InvalidationMsgsGroup { - InvalidationChunk *cclist; /* list of chunks holding catcache msgs */ - InvalidationChunk *rclist; /* list of chunks holding relcache msgs */ -} InvalidationListHeader; + int firstmsg[2]; /* first index in relevant array */ + int nextmsg[2]; /* last+1 index */ +} InvalidationMsgsGroup; + +/* Macros to help preserve InvalidationMsgsGroup abstraction */ +#define SetSubGroupToFollow(targetgroup, priorgroup, subgroup) \ + do { \ + (targetgroup)->firstmsg[subgroup] = \ + (targetgroup)->nextmsg[subgroup] = \ + (priorgroup)->nextmsg[subgroup]; \ + } while (0) + +#define SetGroupToFollow(targetgroup, priorgroup) \ + do { \ + SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \ + SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \ + } while (0) + +#define NumMessagesInSubGroup(group, subgroup) \ + ((group)->nextmsg[subgroup] - (group)->firstmsg[subgroup]) + +#define NumMessagesInGroup(group) \ + (NumMessagesInSubGroup(group, CatCacheMsgs) + \ + NumMessagesInSubGroup(group, RelCacheMsgs)) + /*---------------- - * Invalidation info is divided into two lists: + * Invalidation messages are divided into two groups: * 1) events so far in current command, not yet reflected to caches. * 2) events in previous commands of current transaction; these have * been reflected to local caches, and must be either broadcast to * other backends or rolled back from local cache when we commit * or abort the transaction. - * Actually, we need two such lists for each level of nested transaction, + * Actually, we need such groups for each level of nested transaction, * so that we can discard events from an aborted subtransaction. When - * a subtransaction commits, we append its lists to the parent's lists. + * a subtransaction commits, we append its events to the parent's groups. * * The relcache-file-invalidated flag can just be a simple boolean, * since we only act on it at transaction commit; we don't care which @@ -165,11 +210,11 @@ typedef struct TransInvalidationInfo /* Subtransaction nesting depth */ int my_level; - /* head of current-command event list */ - InvalidationListHeader CurrentCmdInvalidMsgs; + /* Events emitted by current command */ + InvalidationMsgsGroup CurrentCmdInvalidMsgs; - /* head of previous-commands event list */ - InvalidationListHeader PriorCmdInvalidMsgs; + /* Events emitted by previous commands of this (sub)transaction */ + InvalidationMsgsGroup PriorCmdInvalidMsgs; /* init file must be invalidated? */ bool RelcacheInitFileInval; @@ -177,10 +222,6 @@ typedef struct TransInvalidationInfo static TransInvalidationInfo *transInvalInfo = NULL; -static SharedInvalidationMessage *SharedInvalidMessagesArray; -static int numSharedInvalidMessagesArray; -static int maxSharedInvalidMessagesArray; - /* GUC storage */ int debug_discard_caches = 0; @@ -218,124 +259,118 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; /* ---------------------------------------------------------------- - * Invalidation list support functions - * - * These three routines encapsulate processing of the "chunked" - * representation of what is logically just a list of messages. + * Invalidation subgroup support functions * ---------------------------------------------------------------- */ /* * AddInvalidationMessage - * Add an invalidation message to a list (of chunks). + * Add an invalidation message to a (sub)group. + * + * The group must be the last active one, since we assume we can add to the + * end of the relevant InvalMessageArray. * - * Note that we do not pay any great attention to maintaining the original - * ordering of the messages. + * subgroup must be CatCacheMsgs or RelCacheMsgs. */ static void -AddInvalidationMessage(InvalidationChunk **listHdr, - SharedInvalidationMessage *msg) +AddInvalidationMessage(InvalidationMsgsGroup *group, int subgroup, + const SharedInvalidationMessage *msg) { - InvalidationChunk *chunk = *listHdr; + InvalMessageArray *ima = &InvalMessageArrays[subgroup]; + int nextindex = group->nextmsg[subgroup]; - if (chunk == NULL) - { - /* First time through; create initial chunk */ -#define FIRSTCHUNKSIZE 32 - chunk = (InvalidationChunk *) - MemoryContextAlloc(CurTransactionContext, - offsetof(InvalidationChunk, msgs) + - FIRSTCHUNKSIZE * sizeof(SharedInvalidationMessage)); - chunk->nitems = 0; - chunk->maxitems = FIRSTCHUNKSIZE; - chunk->next = *listHdr; - *listHdr = chunk; - } - else if (chunk->nitems >= chunk->maxitems) + if (nextindex >= ima->maxmsgs) { - /* Need another chunk; double size of last chunk */ - int chunksize = 2 * chunk->maxitems; - - chunk = (InvalidationChunk *) - MemoryContextAlloc(CurTransactionContext, - offsetof(InvalidationChunk, msgs) + - chunksize * sizeof(SharedInvalidationMessage)); - chunk->nitems = 0; - chunk->maxitems = chunksize; - chunk->next = *listHdr; - *listHdr = chunk; + if (ima->msgs == NULL) + { + /* Create new storage array in TopTransactionContext */ + int reqsize = 32; /* arbitrary */ + + ima->msgs = (SharedInvalidationMessage *) + MemoryContextAlloc(TopTransactionContext, + reqsize * sizeof(SharedInvalidationMessage)); + ima->maxmsgs = reqsize; + Assert(nextindex == 0); + } + else + { + /* Enlarge storage array */ + int reqsize = 2 * ima->maxmsgs; + + ima->msgs = (SharedInvalidationMessage *) + repalloc(ima->msgs, + reqsize * sizeof(SharedInvalidationMessage)); + ima->maxmsgs = reqsize; + } } - /* Okay, add message to current chunk */ - chunk->msgs[chunk->nitems] = *msg; - chunk->nitems++; + /* Okay, add message to current group */ + ima->msgs[nextindex] = *msg; + group->nextmsg[subgroup]++; } /* - * Append one list of invalidation message chunks to another, resetting - * the source chunk-list pointer to NULL. + * Append one subgroup of invalidation messages to another, resetting + * the source subgroup to empty. */ static void -AppendInvalidationMessageList(InvalidationChunk **destHdr, - InvalidationChunk **srcHdr) +AppendInvalidationMessageSubGroup(InvalidationMsgsGroup *dest, + InvalidationMsgsGroup *src, + int subgroup) { - InvalidationChunk *chunk = *srcHdr; - - if (chunk == NULL) - return; /* nothing to do */ - - while (chunk->next != NULL) - chunk = chunk->next; + /* Messages must be adjacent in main array */ + Assert(dest->nextmsg[subgroup] == src->firstmsg[subgroup]); - chunk->next = *destHdr; + /* ... which makes this easy: */ + dest->nextmsg[subgroup] = src->nextmsg[subgroup]; - *destHdr = *srcHdr; - - *srcHdr = NULL; + /* + * This is handy for some callers and irrelevant for others. But we do it + * always, reasoning that it's bad to leave different groups pointing at + * the same fragment of the message array. + */ + SetSubGroupToFollow(src, dest, subgroup); } /* - * Process a list of invalidation messages. + * Process a subgroup of invalidation messages. * * This is a macro that executes the given code fragment for each message in - * a message chunk list. The fragment should refer to the message as *msg. + * a message subgroup. The fragment should refer to the message as *msg. */ -#define ProcessMessageList(listHdr, codeFragment) \ +#define ProcessMessageSubGroup(group, subgroup, codeFragment) \ do { \ - InvalidationChunk *_chunk; \ - for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \ + int _msgindex = (group)->firstmsg[subgroup]; \ + int _endmsg = (group)->nextmsg[subgroup]; \ + for (; _msgindex < _endmsg; _msgindex++) \ { \ - int _cindex; \ - for (_cindex = 0; _cindex < _chunk->nitems; _cindex++) \ - { \ - SharedInvalidationMessage *msg = &_chunk->msgs[_cindex]; \ - codeFragment; \ - } \ + SharedInvalidationMessage *msg = \ + &InvalMessageArrays[subgroup].msgs[_msgindex]; \ + codeFragment; \ } \ } while (0) /* - * Process a list of invalidation messages group-wise. + * Process a subgroup of invalidation messages as an array. * * As above, but the code fragment can handle an array of messages. * The fragment should refer to the messages as msgs[], with n entries. */ -#define ProcessMessageListMulti(listHdr, codeFragment) \ +#define ProcessMessageSubGroupMulti(group, subgroup, codeFragment) \ do { \ - InvalidationChunk *_chunk; \ - for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \ - { \ - SharedInvalidationMessage *msgs = _chunk->msgs; \ - int n = _chunk->nitems; \ + int n = NumMessagesInSubGroup(group, subgroup); \ + if (n > 0) { \ + SharedInvalidationMessage *msgs = \ + &InvalMessageArrays[subgroup].msgs[(group)->firstmsg[subgroup]]; \ codeFragment; \ } \ } while (0) /* ---------------------------------------------------------------- - * Invalidation set support functions + * Invalidation group support functions * * These routines understand about the division of a logical invalidation - * list into separate physical lists for catcache and relcache entries. + * group into separate physical arrays for catcache and relcache entries. * ---------------------------------------------------------------- */ @@ -343,7 +378,7 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr, * Add a catcache inval entry */ static void -AddCatcacheInvalidationMessage(InvalidationListHeader *hdr, +AddCatcacheInvalidationMessage(InvalidationMsgsGroup *group, int id, uint32 hashValue, Oid dbId) { SharedInvalidationMessage msg; @@ -364,14 +399,14 @@ AddCatcacheInvalidationMessage(InvalidationListHeader *hdr, */ VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); - AddInvalidationMessage(&hdr->cclist, &msg); + AddInvalidationMessage(group, CatCacheMsgs, &msg); } /* * Add a whole-catalog inval entry */ static void -AddCatalogInvalidationMessage(InvalidationListHeader *hdr, +AddCatalogInvalidationMessage(InvalidationMsgsGroup *group, Oid dbId, Oid catId) { SharedInvalidationMessage msg; @@ -382,14 +417,14 @@ AddCatalogInvalidationMessage(InvalidationListHeader *hdr, /* check AddCatcacheInvalidationMessage() for an explanation */ VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); - AddInvalidationMessage(&hdr->cclist, &msg); + AddInvalidationMessage(group, CatCacheMsgs, &msg); } /* * Add a relcache inval entry */ static void -AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, +AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group, Oid dbId, Oid relId) { SharedInvalidationMessage msg; @@ -399,11 +434,11 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, * it will never change. InvalidOid for relId means all relations so we * don't need to add individual ones when it is present. */ - ProcessMessageList(hdr->rclist, - if (msg->rc.id == SHAREDINVALRELCACHE_ID && - (msg->rc.relId == relId || - msg->rc.relId == InvalidOid)) - return); + ProcessMessageSubGroup(group, RelCacheMsgs, + if (msg->rc.id == SHAREDINVALRELCACHE_ID && + (msg->rc.relId == relId || + msg->rc.relId == InvalidOid)) + return); /* OK, add the item */ msg.rc.id = SHAREDINVALRELCACHE_ID; @@ -412,24 +447,26 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, /* check AddCatcacheInvalidationMessage() for an explanation */ VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); - AddInvalidationMessage(&hdr->rclist, &msg); + AddInvalidationMessage(group, RelCacheMsgs, &msg); } /* * Add a snapshot inval entry + * + * We put these into the relcache subgroup for simplicity. */ static void -AddSnapshotInvalidationMessage(InvalidationListHeader *hdr, +AddSnapshotInvalidationMessage(InvalidationMsgsGroup *group, Oid dbId, Oid relId) { SharedInvalidationMessage msg; /* Don't add a duplicate item */ /* We assume dbId need not be checked because it will never change */ - ProcessMessageList(hdr->rclist, - if (msg->sn.id == SHAREDINVALSNAPSHOT_ID && - msg->sn.relId == relId) - return); + ProcessMessageSubGroup(group, RelCacheMsgs, + if (msg->sn.id == SHAREDINVALSNAPSHOT_ID && + msg->sn.relId == relId) + return); /* OK, add the item */ msg.sn.id = SHAREDINVALSNAPSHOT_ID; @@ -438,33 +475,33 @@ AddSnapshotInvalidationMessage(InvalidationListHeader *hdr, /* check AddCatcacheInvalidationMessage() for an explanation */ VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); - AddInvalidationMessage(&hdr->rclist, &msg); + AddInvalidationMessage(group, RelCacheMsgs, &msg); } /* - * Append one list of invalidation messages to another, resetting - * the source list to empty. + * Append one group of invalidation messages to another, resetting + * the source group to empty. */ static void -AppendInvalidationMessages(InvalidationListHeader *dest, - InvalidationListHeader *src) +AppendInvalidationMessages(InvalidationMsgsGroup *dest, + InvalidationMsgsGroup *src) { - AppendInvalidationMessageList(&dest->cclist, &src->cclist); - AppendInvalidationMessageList(&dest->rclist, &src->rclist); + AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs); + AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs); } /* - * Execute the given function for all the messages in an invalidation list. - * The list is not altered. + * Execute the given function for all the messages in an invalidation group. + * The group is not altered. * * catcache entries are processed first, for reasons mentioned above. */ static void -ProcessInvalidationMessages(InvalidationListHeader *hdr, +ProcessInvalidationMessages(InvalidationMsgsGroup *group, void (*func) (SharedInvalidationMessage *msg)) { - ProcessMessageList(hdr->cclist, func(msg)); - ProcessMessageList(hdr->rclist, func(msg)); + ProcessMessageSubGroup(group, CatCacheMsgs, func(msg)); + ProcessMessageSubGroup(group, RelCacheMsgs, func(msg)); } /* @@ -472,11 +509,11 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr, * rather than just one at a time. */ static void -ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr, +ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group, void (*func) (const SharedInvalidationMessage *msgs, int n)) { - ProcessMessageListMulti(hdr->cclist, func(msgs, n)); - ProcessMessageListMulti(hdr->rclist, func(msgs, n)); + ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n)); + ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n)); } /* ---------------------------------------------------------------- @@ -731,7 +768,7 @@ AcceptInvalidationMessages(void) /* * PrepareInvalidationState - * Initialize inval lists for the current (sub)transaction. + * Initialize inval data for the current (sub)transaction. */ static void PrepareInvalidationState(void) @@ -748,12 +785,45 @@ PrepareInvalidationState(void) myInfo->parent = transInvalInfo; myInfo->my_level = GetCurrentTransactionNestLevel(); - /* - * If there's any previous entry, this one should be for a deeper nesting - * level. - */ - Assert(transInvalInfo == NULL || - myInfo->my_level > transInvalInfo->my_level); + /* Now, do we have a previous stack entry? */ + if (transInvalInfo != NULL) + { + /* Yes; this one should be for a deeper nesting level. */ + Assert(myInfo->my_level > transInvalInfo->my_level); + + /* + * The parent (sub)transaction must not have any current (i.e., + * not-yet-locally-processed) messages. If it did, we'd have a + * semantic problem: the new subtransaction presumably ought not be + * able to see those events yet, but since the CommandCounter is + * linear, that can't work once the subtransaction advances the + * counter. This is a convenient place to check for that, as well as + * being important to keep management of the message arrays simple. + */ + if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) != 0) + elog(ERROR, "cannot start a subtransaction when there are unprocessed inval messages"); + + /* + * MemoryContextAllocZero set firstmsg = nextmsg = 0 in each group, + * which is fine for the first (sub)transaction, but otherwise we need + * to update them to follow whatever is already in the arrays. + */ + SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs, + &transInvalInfo->CurrentCmdInvalidMsgs); + SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs, + &myInfo->PriorCmdInvalidMsgs); + } + else + { + /* + * Here, we need only clear any array pointers left over from a prior + * transaction. + */ + InvalMessageArrays[CatCacheMsgs].msgs = NULL; + InvalMessageArrays[CatCacheMsgs].maxmsgs = 0; + InvalMessageArrays[RelCacheMsgs].msgs = NULL; + InvalMessageArrays[RelCacheMsgs].maxmsgs = 0; + } transInvalInfo = myInfo; } @@ -777,47 +847,8 @@ PostPrepare_Inval(void) } /* - * Collect invalidation messages into SharedInvalidMessagesArray array. - */ -static void -MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n) -{ - /* - * Initialise array first time through in each commit - */ - if (SharedInvalidMessagesArray == NULL) - { - maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE; - numSharedInvalidMessagesArray = 0; - - /* - * Although this is being palloc'd we don't actually free it directly. - * We're so close to EOXact that we now we're going to lose it anyhow. - */ - SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray - * sizeof(SharedInvalidationMessage)); - } - - if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray) - { - maxSharedInvalidMessagesArray = pg_nextpower2_32(numSharedInvalidMessagesArray + n); - - SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray, - maxSharedInvalidMessagesArray - * sizeof(SharedInvalidationMessage)); - } - - /* - * Append the next chunk onto the array - */ - memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray, - msgs, n * sizeof(SharedInvalidationMessage)); - numSharedInvalidMessagesArray += n; -} - -/* - * xactGetCommittedInvalidationMessages() is executed by - * RecordTransactionCommit() to add invalidation messages onto the + * xactGetCommittedInvalidationMessages() is called by + * RecordTransactionCommit() to collect invalidation messages to add to the * commit record. This applies only to commit message types, never to * abort records. Must always run before AtEOXact_Inval(), since that * removes the data we need to see. @@ -832,7 +863,9 @@ int xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, bool *RelcacheInitFileInval) { - MemoryContext oldcontext; + SharedInvalidationMessage *msgarray; + int nummsgs; + int nmsgs; /* Quick exit if we haven't done anything with invalidation messages. */ if (transInvalInfo == NULL) @@ -853,27 +886,48 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval; /* - * Walk through TransInvalidationInfo to collect all the messages into a - * single contiguous array of invalidation messages. It must be contiguous - * so we can copy directly into WAL message. Maintain the order that they - * would be processed in by AtEOXact_Inval(), to ensure emulated behaviour - * in redo is as similar as possible to original. We want the same bugs, - * if any, not new ones. + * Collect all the pending messages into a single contiguous array of + * invalidation messages, to simplify what needs to happen while building + * the commit WAL message. Maintain the order that they would be + * processed in by AtEOXact_Inval(), to ensure emulated behaviour in redo + * is as similar as possible to original. We want the same bugs, if any, + * not new ones. */ - oldcontext = MemoryContextSwitchTo(CurTransactionContext); - - ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, - MakeSharedInvalidMessagesArray); - ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs, - MakeSharedInvalidMessagesArray); - MemoryContextSwitchTo(oldcontext); - - Assert(!(numSharedInvalidMessagesArray > 0 && - SharedInvalidMessagesArray == NULL)); - - *msgs = SharedInvalidMessagesArray; - - return numSharedInvalidMessagesArray; + nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) + + NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs); + + *msgs = msgarray = (SharedInvalidationMessage *) + MemoryContextAlloc(CurTransactionContext, + nummsgs * sizeof(SharedInvalidationMessage)); + + nmsgs = 0; + ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs, + CatCacheMsgs, + (memcpy(msgarray + nmsgs, + msgs, + n * sizeof(SharedInvalidationMessage)), + nmsgs += n)); + ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs, + CatCacheMsgs, + (memcpy(msgarray + nmsgs, + msgs, + n * sizeof(SharedInvalidationMessage)), + nmsgs += n)); + ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs, + RelCacheMsgs, + (memcpy(msgarray + nmsgs, + msgs, + n * sizeof(SharedInvalidationMessage)), + nmsgs += n)); + ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs, + RelCacheMsgs, + (memcpy(msgarray + nmsgs, + msgs, + n * sizeof(SharedInvalidationMessage)), + nmsgs += n)); + Assert(nmsgs == nummsgs); + + return nmsgs; } /* @@ -942,7 +996,7 @@ ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs, * about CurrentCmdInvalidMsgs too, since those changes haven't touched * the caches yet. * - * In any case, reset the various lists to empty. We need not physically + * In any case, reset our state to empty. We need not physically * free memory here, since TopTransactionContext is about to be emptied * anyway. * @@ -986,8 +1040,6 @@ AtEOXact_Inval(bool isCommit) /* Need not free anything explicitly */ transInvalInfo = NULL; - SharedInvalidMessagesArray = NULL; - numSharedInvalidMessagesArray = 0; } /* @@ -1043,10 +1095,21 @@ AtEOSubXact_Inval(bool isCommit) return; } - /* Pass up my inval messages to parent */ + /* + * Pass up my inval messages to parent. Notice that we stick them in + * PriorCmdInvalidMsgs, not CurrentCmdInvalidMsgs, since they've + * already been locally processed. (This would trigger the Assert in + * AppendInvalidationMessageSubGroup if the parent's + * CurrentCmdInvalidMsgs isn't empty; but we already checked that in + * PrepareInvalidationState.) + */ AppendInvalidationMessages(&myInfo->parent->PriorCmdInvalidMsgs, &myInfo->PriorCmdInvalidMsgs); + /* Must readjust parent's CurrentCmdInvalidMsgs indexes now */ + SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs, + &myInfo->parent->PriorCmdInvalidMsgs); + /* Pending relcache inval becomes parent's problem too */ if (myInfo->RelcacheInitFileInval) myInfo->parent->RelcacheInitFileInval = true; @@ -1514,31 +1577,24 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) /* * LogLogicalInvalidations * - * Emit WAL for invalidations. This is currently only used for logging - * invalidations at the command end or at commit time if any invalidations - * are pending. + * Emit WAL for invalidations caused by the current command. + * + * This is currently only used for logging invalidations at the command end + * or at commit time if any invalidations are pending. */ void -LogLogicalInvalidations() +LogLogicalInvalidations(void) { xl_xact_invals xlrec; - SharedInvalidationMessage *invalMessages; - int nmsgs = 0; + InvalidationMsgsGroup *group; + int nmsgs; /* Quick exit if we haven't done anything with invalidation messages. */ if (transInvalInfo == NULL) return; - ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, - MakeSharedInvalidMessagesArray); - - Assert(!(numSharedInvalidMessagesArray > 0 && - SharedInvalidMessagesArray == NULL)); - - invalMessages = SharedInvalidMessagesArray; - nmsgs = numSharedInvalidMessagesArray; - SharedInvalidMessagesArray = NULL; - numSharedInvalidMessagesArray = 0; + group = &transInvalInfo->CurrentCmdInvalidMsgs; + nmsgs = NumMessagesInGroup(group); if (nmsgs > 0) { @@ -1549,10 +1605,12 @@ LogLogicalInvalidations() /* perform insertion */ XLogBeginInsert(); XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals); - XLogRegisterData((char *) invalMessages, - nmsgs * sizeof(SharedInvalidationMessage)); + ProcessMessageSubGroupMulti(group, CatCacheMsgs, + XLogRegisterData((char *) msgs, + n * sizeof(SharedInvalidationMessage))); + ProcessMessageSubGroupMulti(group, RelCacheMsgs, + XLogRegisterData((char *) msgs, + n * sizeof(SharedInvalidationMessage))); XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); - - pfree(invalMessages); } }
pgsql-hackers by date: