From 4c6daeb460ace314b220a9bf1a5ff59f2afd8ce1 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Tue, 11 Jun 2024 12:15:16 +0530 Subject: [PATCH v20240625 1/3] Introduce pg_sequence_state and SetSequence functions for enhanced sequence management This patch introduces new functionalities to PostgreSQL: - pg_sequence_state allows retrieval of sequence values using LSN. - SetSequence enables updating sequences with user-specified values. --- src/backend/commands/sequence.c | 161 ++++++++++++++++++++++++++++++-- src/include/catalog/pg_proc.dat | 8 ++ src/include/commands/sequence.h | 1 + 3 files changed, 162 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 28f8522264..57453a7356 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -45,6 +45,7 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/pg_lsn.h" #include "utils/resowner.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -102,7 +103,8 @@ static Relation lock_and_open_sequence(SeqTable seq); static void create_seq_hashtable(void); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); static Form_pg_sequence_data read_seq_tuple(Relation rel, - Buffer *buf, HeapTuple seqdatatuple); + Buffer *buf, HeapTuple seqdatatuple, + XLogRecPtr *lsn); static void init_params(ParseState *pstate, List *options, bool for_identity, bool isInit, Form_pg_sequence seqform, @@ -277,7 +279,7 @@ ResetSequence(Oid seq_relid) * indeed a sequence. */ init_sequence(seq_relid, &elm, &seq_rel); - (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple); + (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple, NULL); pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seq_relid)); if (!HeapTupleIsValid(pgstuple)) @@ -328,6 +330,80 @@ ResetSequence(Oid seq_relid) sequence_close(seq_rel, NoLock); } +/* + * Set a sequence to a specified internal state. + * + * Caller is assumed to have acquired AccessExclusiveLock on the sequence, + * which must not be released until end of transaction. Caller is also + * responsible for permissions checking. + */ +void +SetSequence(Oid seq_relid, int64 value) +{ + SeqTable elm; + Relation seqrel; + Buffer buf; + HeapTupleData seqdatatuple; + Form_pg_sequence_data seq; + + /* open and lock sequence */ + init_sequence(seq_relid, &elm, &seqrel); + + /* lock page' buffer and read tuple */ + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); + + /* check the comment above nextval_internal()'s equivalent call. */ + if (RelationNeedsWAL(seqrel)) + { + GetTopTransactionId(); + + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + + /* ready to change the on-disk (or really, in-buffer) tuple */ + START_CRIT_SECTION(); + + seq->last_value = value; + seq->is_called = true; + seq->log_cnt = 0; + + MarkBufferDirty(buf); + + /* XLOG stuff */ + if (RelationNeedsWAL(seqrel)) + { + xl_seq_rec xlrec; + XLogRecPtr recptr; + Page page = BufferGetPage(buf); + + XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); + + xlrec.locator = seqrel->rd_locator; + + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); + XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); + + /* allow filtering by origin on a sequence update */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buf); + + /* Clear local cache so that we don't think we have cached numbers */ + /* Note that we do not change the currval() state */ + elm->cached = elm->last; + + relation_close(seqrel, NoLock); +} + /* * Initialize a sequence's relation with the specified tuple as content * @@ -476,7 +552,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) seqform = (Form_pg_sequence) GETSTRUCT(seqtuple); /* lock page buffer and read tuple into new sequence structure */ - (void) read_seq_tuple(seqrel, &buf, &datatuple); + (void) read_seq_tuple(seqrel, &buf, &datatuple, NULL); /* copy the existing sequence data tuple, so it can be modified locally */ newdatatuple = heap_copytuple(&datatuple); @@ -551,7 +627,7 @@ SequenceChangePersistence(Oid relid, char newrelpersistence) if (RelationNeedsWAL(seqrel)) GetTopTransactionId(); - (void) read_seq_tuple(seqrel, &buf, &seqdatatuple); + (void) read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); RelationSetNewRelfilenumber(seqrel, newrelpersistence); fill_seq_with_data(seqrel, &seqdatatuple); UnlockReleaseBuffer(buf); @@ -680,7 +756,7 @@ nextval_internal(Oid relid, bool check_permissions) ReleaseSysCache(pgstuple); /* lock page buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); page = BufferGetPage(buf); last = next = result = seq->last_value; @@ -976,7 +1052,7 @@ do_setval(Oid relid, int64 next, bool iscalled) PreventCommandIfParallelMode("setval()"); /* lock page buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); if ((next < minv) || (next > maxv)) ereport(ERROR, @@ -1180,7 +1256,8 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel) * Function's return value points to the data payload of the tuple */ static Form_pg_sequence_data -read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple) +read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple, + XLogRecPtr *lsn) { Page page; ItemId lp; @@ -1197,6 +1274,13 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple) elog(ERROR, "bad magic number in sequence \"%s\": %08X", RelationGetRelationName(rel), sm->magic); + /* + * If the caller requested it, set the page LSN. This allows deciding + * which sequence changes are before/after the returned sequence state. + */ + if (lsn) + *lsn = PageGetLSN(page); + lp = PageGetItemId(page, FirstOffsetNumber); Assert(ItemIdIsNormal(lp)); @@ -1804,7 +1888,7 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) HeapTupleData seqtuple; Form_pg_sequence_data seq; - seq = read_seq_tuple(seqrel, &buf, &seqtuple); + seq = read_seq_tuple(seqrel, &buf, &seqtuple, NULL); is_called = seq->is_called; result = seq->last_value; @@ -1819,6 +1903,67 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } +/* + * Return the current on-disk state of the sequence. + * + * Note: This is roughly equivalent to selecting the data from the sequence, + * except that it also returns the page LSN. + */ +Datum +pg_sequence_state(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + SeqTable elm; + Relation seqrel; + Buffer buf; + HeapTupleData seqtuple; + Form_pg_sequence_data seq; + Datum result; + + int64 last_value; + int64 log_cnt; + bool is_called; + XLogRecPtr lsn; + + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[4]; + bool nulls[4]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* open and lock sequence */ + init_sequence(relid, &elm, &seqrel); + + if (pg_class_aclcheck(elm->relid, GetUserId(), + ACL_SELECT | ACL_USAGE) != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for sequence %s", + RelationGetRelationName(seqrel)))); + + seq = read_seq_tuple(seqrel, &buf, &seqtuple, &lsn); + + is_called = seq->is_called; + last_value = seq->last_value; + log_cnt = seq->log_cnt; + + UnlockReleaseBuffer(buf); + relation_close(seqrel, NoLock); + + values[0] = LSNGetDatum(lsn); + values[1] = Int64GetDatum(last_value); + values[2] = Int64GetDatum(log_cnt); + values[3] = BoolGetDatum(is_called); + + memset(nulls, 0, sizeof(nulls)); + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +} void seq_redo(XLogReaderState *record) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 6a5476d3c4..990ef2f836 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3329,6 +3329,14 @@ proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u', prorettype => 'int8', proargtypes => 'regclass', prosrc => 'pg_sequence_last_value' }, +{ oid => '6313', + descr => 'current on-disk sequence state', + proname => 'pg_sequence_state', provolatile => 'v', + prorettype => 'record', proargtypes => 'regclass', + proallargtypes => '{regclass,pg_lsn,int8,int8,bool}', + proargmodes => '{i,o,o,o,o}', + proargnames => '{seq_oid,page_lsn,last_value,log_cnt,is_called}', + prosrc => 'pg_sequence_state' }, { oid => '275', descr => 'return the next oid for a system table', proname => 'pg_nextoid', provolatile => 'v', proparallel => 'u', diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index e88cbee3b5..fad731a733 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid seq_relid, int64 value); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); -- 2.34.1