From ac08c66244a5f55f56df36fefaef67dcd43ac2c6 Mon Sep 17 00:00:00 2001 From: Nitin Motiani Date: Wed, 22 Apr 2026 11:50:01 +0000 Subject: [PATCH v4 1/6] Implement bulk API for Large Objects This commit introduces new bulk APIs (lo_bulk_put and lo_bulk_write) to significantly improve the performance of ingesting massive numbers of large objects. By passing arrays of OIDs and bytea data, clients can bypass the network latency and query parsing overhead associated with executing thousands of individual lo_put or lo_write commands. Implementation Details: * Heap Packing and WAL Consolidation: The backend utilizes table_multi_insert to pack up to 100 data pages (tuples) into single 8KB heap pages before flushing. This reduces locking contention and consolidates Write-Ahead Log (WAL) records. * Index Maintenance (flush_lo_inserts): Because table_multi_insert bypasses index updates, the helper function flush_lo_inserts is responsible for iterating over the batched slots and performing individual ExecInsertIndexTuples calls to keep the pg_largeobject_loid_pn_index up to date. * Deferred Snapshot Updates: Instead of calling CommandCounterIncrement() after every single page write, the bulk APIs defer the command counter increment until the very end of the function, significantly reducing CPU overhead and snapshot management friction. * Internal Metadata Batching: The internal C function inv_bulk_create (called by lo_bulk_put) optimizes the creation of the pg_largeobject_metadata catalog entries using a similar table_multi_insert batching strategy (up to 1,000 entries). In the future, a standalone lo_bulk_create SQL function can be easily exposed to clients using this existing internal infrastructure. * Memory Management Optimization: Variable-length bytea allocations are strictly isolated to a temporary batchcontext to maintain a tiny memory footprint. Static TupleTableSlot structures are allocated once in the persistent transaction context and efficiently reused via ExecClearTuple, avoiding the overhead of continuous slot creation and destruction. * Safe Fast-Path Fallback: lo_bulk_write safely restricts table_multi_insert to append-only operations for new pages (offset >= inv_getsize() && offset % LOBLKSIZE == 0). It falls back to the standard read-modify-write logic of inv_write() when overwriting existing pages to prevent duplicate key constraint violations. Testing: Adds regression tests to largeobject.sql, including massive batch inserts (up to 1,500 objects) to ensure memory stability and correct enforcement of MAX_LO_BUFFERED_TUPLES and MAX_LO_METADATA_BUFFERED_TUPLES limits. Also test transaction safety, rollbacks, and null inputs. --- src/backend/libpq/be-fsstubs.c | 167 +++++++++ src/backend/storage/large_object/inv_api.c | 411 +++++++++++++++++++++ src/include/catalog/pg_proc.dat | 6 + src/include/libpq/be-fsstubs.h | 6 + src/include/storage/large_object.h | 17 + src/test/regress/expected/largeobject.out | 239 ++++++++++++ src/test/regress/sql/largeobject.sql | 96 +++++ 7 files changed, 942 insertions(+) diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c index f27e374c4ee..f026ecd8993 100644 --- a/src/backend/libpq/be-fsstubs.c +++ b/src/backend/libpq/be-fsstubs.c @@ -44,13 +44,16 @@ #include "access/xact.h" #include "catalog/pg_largeobject.h" +#include "catalog/pg_type.h" #include "libpq/be-fsstubs.h" #include "libpq/libpq-fs.h" #include "miscadmin.h" #include "storage/fd.h" #include "storage/large_object.h" +#include "utils/array.h" #include "utils/acl.h" #include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "varatt.h" @@ -871,3 +874,167 @@ be_lo_put(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +int64 +lo_bulk_write(const LoBulkWriteItem *items, int nitems) +{ + if (nitems <= 0) + return 0; + + PreventCommandIfReadOnly("lo_bulk_write()"); + + /* lo_cleanup_needed is already set by lo_open calls required for fds */ + + return inv_bulk_write(items, nitems); +} + +int64 +lo_bulk_put(const LoBulkPutItem *items, int nitems) +{ + if (nitems <= 0) + return 0; + + PreventCommandIfReadOnly("lo_bulk_put()"); + + lo_cleanup_needed = true; + + return inv_bulk_put(items, nitems); +} + +Datum +be_lo_bulk_put(PG_FUNCTION_ARGS) +{ + ArrayType *oids_arr = PG_GETARG_ARRAYTYPE_P(0); + ArrayType *contents_arr = PG_GETARG_ARRAYTYPE_P(1); + Oid oids_eltype = ARR_ELEMTYPE(oids_arr); + Oid contents_eltype = ARR_ELEMTYPE(contents_arr); + int16 oids_elmlen, + contents_elmlen; + bool oids_elmbyval, + contents_elmbyval; + char oids_elmalign, + contents_elmalign; + Datum *oid_datums, + *content_datums; + bool *oid_nulls, + *content_nulls; + int noids, + ncontents; + LoBulkPutItem *items; + int64 res; + int i; + + if (oids_eltype != OIDOID) + elog(ERROR, "expected OID array for first argument"); + if (contents_eltype != BYTEAOID) + elog(ERROR, "expected bytea array for second argument"); + + get_typlenbyvalalign(oids_eltype, &oids_elmlen, &oids_elmbyval, &oids_elmalign); + get_typlenbyvalalign(contents_eltype, &contents_elmlen, &contents_elmbyval, &contents_elmalign); + + deconstruct_array(oids_arr, oids_eltype, oids_elmlen, oids_elmbyval, oids_elmalign, + &oid_datums, &oid_nulls, &noids); + deconstruct_array(contents_arr, contents_eltype, contents_elmlen, contents_elmbyval, contents_elmalign, + &content_datums, &content_nulls, &ncontents); + + if (noids != ncontents) + elog(ERROR, "oid and content arrays must have same length"); + + if (noids == 0) + PG_RETURN_INT64(0); + + items = palloc(noids * sizeof(LoBulkPutItem)); + for (i = 0; i < noids; i++) + { + bytea *b; + + if (oid_nulls[i] || content_nulls[i]) + elog(ERROR, "array elements cannot be null"); + items[i].loid = DatumGetObjectId(oid_datums[i]); + b = DatumGetByteaPP(content_datums[i]); + items[i].buf = VARDATA_ANY(b); + items[i].len = VARSIZE_ANY_EXHDR(b); + } + + res = lo_bulk_put(items, noids); + pfree(items); + + PG_RETURN_INT64(res); +} + +Datum +be_lo_bulk_write(PG_FUNCTION_ARGS) +{ + ArrayType *fds_arr = PG_GETARG_ARRAYTYPE_P(0); + ArrayType *contents_arr = PG_GETARG_ARRAYTYPE_P(1); + Oid fds_eltype = ARR_ELEMTYPE(fds_arr); + Oid contents_eltype = ARR_ELEMTYPE(contents_arr); + int16 fds_elmlen, + contents_elmlen; + bool fds_elmbyval, + contents_elmbyval; + char fds_elmalign, + contents_elmalign; + Datum *fd_datums, + *content_datums; + bool *fd_nulls, + *content_nulls; + int nfds, + ncontents; + LoBulkWriteItem *items; + int64 res; + int i; + + if (fds_eltype != INT4OID) + elog(ERROR, "expected integer array for first argument"); + if (contents_eltype != BYTEAOID) + elog(ERROR, "expected bytea array for second argument"); + + get_typlenbyvalalign(fds_eltype, &fds_elmlen, &fds_elmbyval, &fds_elmalign); + get_typlenbyvalalign(contents_eltype, &contents_elmlen, &contents_elmbyval, &contents_elmalign); + + deconstruct_array(fds_arr, fds_eltype, fds_elmlen, fds_elmbyval, fds_elmalign, + &fd_datums, &fd_nulls, &nfds); + deconstruct_array(contents_arr, contents_eltype, contents_elmlen, contents_elmbyval, contents_elmalign, + &content_datums, &content_nulls, &ncontents); + + if (nfds != ncontents) + elog(ERROR, "fd and content arrays must have same length"); + + if (nfds == 0) + PG_RETURN_INT64(0); + + items = palloc(nfds * sizeof(LoBulkWriteItem)); + for (i = 0; i < nfds; i++) + { + int fd = DatumGetInt32(fd_datums[i]); + LargeObjectDesc *lobj; + bytea *b; + + if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("invalid large-object descriptor: %d", fd))); + lobj = cookies[fd]; + + /* see comment in lo_read() */ + if ((lobj->flags & IFS_WRLOCK) == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("large object descriptor %d was not opened for writing", + fd))); + + if (fd_nulls[i] || content_nulls[i]) + elog(ERROR, "array elements cannot be null"); + + items[i].desc = lobj; + b = DatumGetByteaPP(content_datums[i]); + items[i].buf = VARDATA_ANY(b); + items[i].len = VARSIZE_ANY_EXHDR(b); + } + + res = lo_bulk_write(items, nfds); + pfree(items); + + PG_RETURN_INT64(res); +} diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c index a3cce496c20..fe6d7873af5 100644 --- a/src/backend/storage/large_object/inv_api.c +++ b/src/backend/storage/large_object/inv_api.c @@ -35,12 +35,18 @@ #include "access/detoast.h" #include "access/genam.h" #include "access/htup_details.h" +#include "access/heapam.h" +#include "access/tableam.h" #include "access/table.h" #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/indexing.h" #include "catalog/objectaccess.h" #include "catalog/pg_largeobject.h" +#include "catalog/pg_largeobject_metadata.h" +#include "commands/trigger.h" +#include "executor/executor.h" +#include "executor/tuptable.h" #include "libpq/libpq-fs.h" #include "miscadmin.h" #include "storage/large_object.h" @@ -734,6 +740,411 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes) return nwritten; } +#define MAX_LO_BUFFERED_TUPLES 100 +#define MAX_LO_METADATA_BUFFERED_TUPLES 1000 + +/* + * Use the larger of the two limits for the static array size so the shared + * state can always handle both. This prevents any future risk of buffer + * overflow if the macro values are changed. + * + * NOTE: This leads to a slightly more bloated array on the stack. Based on + * review comments, we might change this approach. + */ +#define MAX_BULK_SLOTS (MAX_LO_METADATA_BUFFERED_TUPLES > MAX_LO_BUFFERED_TUPLES ? MAX_LO_METADATA_BUFFERED_TUPLES : MAX_LO_BUFFERED_TUPLES) + +typedef struct LoBulkInsertState +{ + Relation rel; + int max_slots; + TupleTableSlot *slots[MAX_BULK_SLOTS]; + int nslots; + BulkInsertState bistate; + MemoryContext batchcontext; + EState *estate; + ResultRelInfo *resultRelInfo; +} LoBulkInsertState; + +static void +flush_lo_inserts(LoBulkInsertState *state) +{ + MemoryContext oldcontext; + int i; + + if (state->nslots == 0) + return; + + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(state->estate)); + table_multi_insert(state->resultRelInfo->ri_RelationDesc, + state->slots, + state->nslots, + GetCurrentCommandId(true), + 0, + state->bistate); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < state->nslots; i++) + { + if (state->resultRelInfo->ri_NumIndices > 0) + { + ExecInsertIndexTuples(state->resultRelInfo, + state->estate, 0, state->slots[i], + NIL, NULL); + } + ExecClearTuple(state->slots[i]); + } +} + + + +static void +init_lo_bulk_insert_state(LoBulkInsertState *state, Relation rel, int max_slots, MemoryContext batchcontext) +{ + int i; + + state->rel = rel; + state->max_slots = max_slots; + state->nslots = 0; + state->estate = NULL; + state->resultRelInfo = NULL; + + state->bistate = GetBulkInsertState(); + state->batchcontext = batchcontext; + + for (i = 0; i < max_slots; ++i) + state->slots[i] = table_slot_create(rel, NULL); +} + +static void +ensure_lo_bulk_estate(LoBulkInsertState *state) +{ + if (!state->estate) + { + state->estate = CreateExecutorState(); + state->resultRelInfo = makeNode(ResultRelInfo); + state->resultRelInfo->ri_RangeTableIndex = 1; + state->resultRelInfo->ri_RelationDesc = state->rel; + state->resultRelInfo->ri_TrigDesc = NULL; + state->resultRelInfo->ri_FdwRoutine = NULL; + ExecOpenIndices(state->resultRelInfo, false); + } +} + +static void +buffer_lo_page(LoBulkInsertState *state, Oid loid, int32 pageno, const char *buf, int len) +{ + MemoryContext oldcontext; + bytea *pdata; + + oldcontext = MemoryContextSwitchTo(state->batchcontext); + pdata = (bytea *) palloc(len + VARHDRSZ); + SET_VARSIZE(pdata, len + VARHDRSZ); + memcpy(VARDATA(pdata), buf, len); + MemoryContextSwitchTo(oldcontext); + + ExecClearTuple(state->slots[state->nslots]); + state->slots[state->nslots]->tts_values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(loid); + state->slots[state->nslots]->tts_values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno); + state->slots[state->nslots]->tts_values[Anum_pg_largeobject_data - 1] = PointerGetDatum(pdata); + state->slots[state->nslots]->tts_isnull[Anum_pg_largeobject_loid - 1] = false; + state->slots[state->nslots]->tts_isnull[Anum_pg_largeobject_pageno - 1] = false; + state->slots[state->nslots]->tts_isnull[Anum_pg_largeobject_data - 1] = false; + ExecStoreVirtualTuple(state->slots[state->nslots]); + + state->nslots++; + + if (state->nslots >= state->max_slots) + { + ensure_lo_bulk_estate(state); + flush_lo_inserts(state); + state->nslots = 0; + MemoryContextReset(state->batchcontext); + } +} + +static void +flush_and_cleanup_lo_bulk_insert_state(LoBulkInsertState *state) +{ + int i; + + if (state->nslots > 0) + { + ensure_lo_bulk_estate(state); + flush_lo_inserts(state); + state->nslots = 0; + } + + for (i = 0; i < state->max_slots; ++i) + ExecDropSingleTupleTableSlot(state->slots[i]); + + if (state->estate) + { + ExecCloseIndices(state->resultRelInfo); + FreeExecutorState(state->estate); + } + + FreeBulkInsertState(state->bistate); + MemoryContextDelete(state->batchcontext); +} + +int64 +inv_bulk_write(const LoBulkWriteItem *reqs, int nreqs) +{ + int64 total_written = 0; + LoBulkInsertState state; + MemoryContext batchcontext; + int i; + + if (nreqs == 0) + return 0; + + open_lo_relation(); + + batchcontext = AllocSetContextCreate(CurrentMemoryContext, + "LO bulk write context", + ALLOCSET_DEFAULT_SIZES); + init_lo_bulk_insert_state(&state, lo_heap_r, MAX_LO_BUFFERED_TUPLES, batchcontext); + + for (i = 0; i < nreqs; ++i) + { + LargeObjectDesc *obj_desc = reqs[i].desc; + const char *buf = reqs[i].buf; + const int nbytes = reqs[i].len; + int nwritten = 0; + + Assert(PointerIsValid(obj_desc)); + Assert(buf != NULL); + + /* enforce writability because snapshot is probably wrong otherwise */ + if ((obj_desc->flags & IFS_WRLOCK) == 0) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for large object %u", + obj_desc->id))); + + if (nbytes <= 0) + continue; + + /* this addition can't overflow because nbytes is only int32 */ + if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid large object write request size: %d", + nbytes))); + + /* + * The table_multi_insert path offers performance gains but only + * supports inserts; it cannot update existing rows. We explicitly + * choose not to implement complex ON CONFLICT DO UPDATE or + * read-modify-write logic in the bulk API to keep the fast-path + * simple. + * + * We only use the fast-path if we are at or past the end of the large + * object AND we are starting at a page boundary. If we are + * overwriting existing data or appending to a partial page, we fall + * back to the safe, slow-path inv_write(). + * + * NOTE: The performance benefits of the bulk API are only realized + * when the write offset is a multiple of LOBLKSIZE (typically 2048 + * bytes) and is at or past EOF. Unaligned writes will silently fall + * back to individual inv_write() calls to ensure data integrity. + */ + if (obj_desc->offset < inv_getsize(obj_desc) || + (obj_desc->offset % LOBLKSIZE) != 0) + { + if (state.nslots > 0) + { + ensure_lo_bulk_estate(&state); + flush_lo_inserts(&state); + state.nslots = 0; + MemoryContextReset(state.batchcontext); + } + total_written += inv_write(obj_desc, buf, nbytes); + continue; + } + + /* + * Handle aligned writes by batching inserts, assuming we don't + * overwrite pages. + */ + while (nwritten < nbytes) + { + const int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE); + int n = LOBLKSIZE; + + n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten); + + buffer_lo_page(&state, obj_desc->id, pageno, buf + nwritten, n); + + nwritten += n; + obj_desc->offset += n; + } + total_written += nwritten; + } + + flush_and_cleanup_lo_bulk_insert_state(&state); + + /* + * Advance command counter so that my tuple updates will be seen by later + * large-object operations in this transaction. + */ + CommandCounterIncrement(); + + return total_written; +} + +/* + * inv_bulk_put - create and write multiple large objects efficiently. + * + * This function creates and populates multiple large objects in one go. + * It assumes none of the provided LO OIDs exist prior to this call. + * For each request, it creates large object metadata, then batches + * all data page insertions across all requests using table_multi_insert + * for efficiency. + */ +int64 +inv_bulk_put(const LoBulkPutItem *reqs, int nreqs) +{ + int64 total_written = 0; + LoBulkInsertState state; + MemoryContext batchcontext; + int i; + + if (nreqs == 0) + return 0; + + /* + * Create metadata for all LOs first. + */ + inv_bulk_create(reqs, nreqs); + + /* + * Make metadata visible to subsequent heap and index insertions. + */ + CommandCounterIncrement(); + + open_lo_relation(); + + batchcontext = AllocSetContextCreate(CurrentMemoryContext, + "LO bulk put context", + ALLOCSET_DEFAULT_SIZES); + init_lo_bulk_insert_state(&state, lo_heap_r, MAX_LO_BUFFERED_TUPLES, batchcontext); + + for (i = 0; i < nreqs; ++i) + { + const Oid loid = reqs[i].loid; + const char *buf = reqs[i].buf; + const size_t len = reqs[i].len; + size_t nwritten = 0; + int32 pageno = 0; + + while (nwritten < len) + { + int n = LOBLKSIZE; + + if (n > len - nwritten) + n = len - nwritten; + + buffer_lo_page(&state, loid, pageno, buf + nwritten, n); + + nwritten += n; + pageno++; + total_written += n; + } + } + + flush_and_cleanup_lo_bulk_insert_state(&state); + + /* + * Advance command counter so that tuple updates will be seen by later + * large-object operations in this transaction. + */ + CommandCounterIncrement(); + + return total_written; +} + +void +inv_bulk_create(const LoBulkPutItem *reqs, int nreqs) +{ + Relation rel; + int i; + Datum values[Natts_pg_largeobject_metadata]; + bool nulls[Natts_pg_largeobject_metadata]; + Acl *default_acl; + Oid ownerId; + LoBulkInsertState state; + MemoryContext batchcontext; + + if (nreqs == 0) + return; + + rel = table_open(LargeObjectMetadataRelationId, RowExclusiveLock); + + for (i = 0; i < nreqs; ++i) + { + if (LargeObjectExists(reqs[i].loid)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("large object %u already exists", reqs[i].loid))); + } + + ownerId = GetUserId(); + default_acl = get_user_default_acl(OBJECT_LARGEOBJECT, ownerId, + InvalidOid); + + memset(nulls, false, sizeof(nulls)); + if (default_acl == NULL) + nulls[Anum_pg_largeobject_metadata_lomacl - 1] = true; + else + values[Anum_pg_largeobject_metadata_lomacl - 1] = PointerGetDatum(default_acl); + values[Anum_pg_largeobject_metadata_lomowner - 1] = ObjectIdGetDatum(ownerId); + + batchcontext = AllocSetContextCreate(CurrentMemoryContext, + "LO bulk metadata context", + ALLOCSET_DEFAULT_SIZES); + + init_lo_bulk_insert_state(&state, rel, MAX_LO_METADATA_BUFFERED_TUPLES, batchcontext); + + for (i = 0; i < nreqs; ++i) + { + Oid loid = reqs[i].loid; + + values[Anum_pg_largeobject_metadata_oid - 1] = ObjectIdGetDatum(loid); + + ExecClearTuple(state.slots[state.nslots]); + memcpy(state.slots[state.nslots]->tts_values, values, sizeof(values)); + memcpy(state.slots[state.nslots]->tts_isnull, nulls, sizeof(nulls)); + ExecStoreVirtualTuple(state.slots[state.nslots]); + state.nslots++; + + if (state.nslots >= state.max_slots) + { + ensure_lo_bulk_estate(&state); + flush_lo_inserts(&state); + state.nslots = 0; + /* No need to reset batchcontext for metadata as we don't palloc inside the loop */ + } + } + + flush_and_cleanup_lo_bulk_insert_state(&state); + + table_close(rel, RowExclusiveLock); + + for (i = 0; i < nreqs; ++i) + { + Oid loid = reqs[i].loid; + + if (default_acl) + recordDependencyOnNewAcl(LargeObjectRelationId, loid, 0, + ownerId, default_acl); + recordDependencyOnOwner(LargeObjectRelationId, loid, ownerId); + InvokeObjectPostCreateHook(LargeObjectRelationId, loid, 0); + } + + CommandCounterIncrement(); +} + void inv_truncate(LargeObjectDesc *obj_desc, int64 len) { diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fa9ae79082b..813e4431484 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -2078,6 +2078,12 @@ { oid => '958', descr => 'large object position', proname => 'lo_tell', provolatile => 'v', proparallel => 'u', prorettype => 'int4', proargtypes => 'int4', prosrc => 'be_lo_tell' }, +{ oid => '6480', descr => 'large object bulk put', + proname => 'lo_bulk_put', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '_oid _bytea', prosrc => 'be_lo_bulk_put' }, +{ oid => '6481', descr => 'large object bulk write', + proname => 'lo_bulk_write', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '_int4 _bytea', prosrc => 'be_lo_bulk_write' }, { oid => '3171', descr => 'large object position (64 bit)', proname => 'lo_tell64', provolatile => 'v', proparallel => 'u', prorettype => 'int8', proargtypes => 'int4', prosrc => 'be_lo_tell64' }, diff --git a/src/include/libpq/be-fsstubs.h b/src/include/libpq/be-fsstubs.h index 8775939f410..1e402787e35 100644 --- a/src/include/libpq/be-fsstubs.h +++ b/src/include/libpq/be-fsstubs.h @@ -22,6 +22,12 @@ extern int lo_read(int fd, char *buf, int len); extern int lo_write(int fd, const char *buf, int len); +struct LoBulkWriteItem; +struct LoBulkPutItem; + +extern int64 lo_bulk_write(const struct LoBulkWriteItem *items, int nitems); +extern int64 lo_bulk_put(const struct LoBulkPutItem *items, int nitems); + /* * Cleanup LOs at xact commit/abort */ diff --git a/src/include/storage/large_object.h b/src/include/storage/large_object.h index 0291e4e2498..2110c8f2c5e 100644 --- a/src/include/storage/large_object.h +++ b/src/include/storage/large_object.h @@ -51,6 +51,20 @@ typedef struct LargeObjectDesc } LargeObjectDesc; +typedef struct LoBulkWriteItem +{ + LargeObjectDesc *desc; + const char *buf; + int len; +} LoBulkWriteItem; + +typedef struct LoBulkPutItem +{ + Oid loid; + const char *buf; + int len; +} LoBulkPutItem; + /* * Each "page" (tuple) of a large object can hold this much data * @@ -95,6 +109,9 @@ extern int64 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence); extern int64 inv_tell(LargeObjectDesc *obj_desc); extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes); extern int inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes); +extern int64 inv_bulk_write(const LoBulkWriteItem *reqs, int nreqs); +extern int64 inv_bulk_put(const LoBulkPutItem *reqs, int nreqs); +extern void inv_bulk_create(const LoBulkPutItem *reqs, int nreqs); extern void inv_truncate(LargeObjectDesc *obj_desc, int64 len); #endif /* LARGE_OBJECT_H */ diff --git a/src/test/regress/expected/largeobject.out b/src/test/regress/expected/largeobject.out index 4921dd79aee..9984147ed07 100644 --- a/src/test/regress/expected/largeobject.out +++ b/src/test/regress/expected/largeobject.out @@ -560,4 +560,243 @@ ERROR: cannot execute lo_put() in a read-only transaction ROLLBACK; -- Clean up DROP TABLE lotest_stash_values; +-- Test bulk apis +SET bytea_output to hex; +SELECT lo_bulk_put(ARRAY[1001, 1002], ARRAY[E'\\x1234'::bytea, E'\\x5678'::bytea]); + lo_bulk_put +------------- + 4 +(1 row) + +SELECT lo_get(1001); + lo_get +-------- + \x1234 +(1 row) + +SELECT lo_get(1002); + lo_get +-------- + \x5678 +(1 row) + +BEGIN; +SELECT lo_bulk_write(ARRAY[lo_open(1001, 131072), lo_open(1002, 131072)], ARRAY[E'\\x1111'::bytea, E'\\x2222'::bytea]); + lo_bulk_write +--------------- + 4 +(1 row) + +SELECT lo_close(fd) from (SELECT unnest(ARRAY[lo_open(1001, 131072), lo_open(1002, 131072)]) as fd) a; + lo_close +---------- + 0 + 0 +(2 rows) + +COMMIT; +SELECT lo_get(1001); + lo_get +-------- + \x1111 +(1 row) + +SELECT lo_get(1002); + lo_get +-------- + \x2222 +(1 row) + +SELECT lo_unlink(1001); + lo_unlink +----------- + 1 +(1 row) + +SELECT lo_unlink(1002); + lo_unlink +----------- + 1 +(1 row) + +-- Test mixed-mode lo_bulk_write (Fast-path and Fallback-path interleaving) +SELECT lo_create(1003); -- empty + lo_create +----------- + 1003 +(1 row) + +SELECT lo_put(lo_create(1004), 0, E'\\x1234'::bytea); -- 2 bytes + lo_put +-------- + +(1 row) + +SELECT lo_put(lo_create(1005), 0, E'\\x5678'::bytea); -- 2 bytes + lo_put +-------- + +(1 row) + +BEGIN; +-- fd1: 1003 (offset 0, size 0) -> Fast-path +-- fd2: 1004 (offset 0, size 2) -> Fallback (should trigger flush of fd1) +-- fd3: 1005 (offset 2, size 2) -> Fast-path +DO $$ +DECLARE + fd1 int; fd2 int; fd3 int; +BEGIN + fd1 := lo_open(1003, 131072); -- INV_WRITE + fd2 := lo_open(1004, 131072); + fd3 := lo_open(1005, 131072); + PERFORM lo_lseek(fd3, 2, 0); -- move to end for fast-path append + + PERFORM lo_bulk_write(ARRAY[fd1, fd2, fd3], ARRAY[E'\\xAAAA'::bytea, E'\\xBBBB'::bytea, E'\\xCCCC'::bytea]); + + PERFORM lo_close(fd1); + PERFORM lo_close(fd2); + PERFORM lo_close(fd3); +END $$; +COMMIT; +SELECT lo_get(1003); -- Expect \xaaaa + lo_get +-------- + \xaaaa +(1 row) + +SELECT lo_get(1004); -- Expect \xbbbb (overwritten) + lo_get +-------- + \xbbbb +(1 row) + +SELECT lo_get(1005); -- Expect \x5678cccc (appended) + lo_get +------------ + \x5678cccc +(1 row) + +SELECT lo_unlink(1003); + lo_unlink +----------- + 1 +(1 row) + +SELECT lo_unlink(1004); + lo_unlink +----------- + 1 +(1 row) + +SELECT lo_unlink(1005); + lo_unlink +----------- + 1 +(1 row) + +-- Test Savepoint and Rollback (lo_bulk_put) +BEGIN; +SELECT lo_bulk_put(ARRAY[5001, 5002], ARRAY[E'\\x11'::bytea, E'\\x22'::bytea]); + lo_bulk_put +------------- + 2 +(1 row) + +SAVEPOINT s1; +SELECT lo_bulk_put(ARRAY[5003, 5004], ARRAY[E'\\x33'::bytea, E'\\x44'::bytea]); + lo_bulk_put +------------- + 2 +(1 row) + +ROLLBACK TO SAVEPOINT s1; +COMMIT; +SELECT lo_get(5001); -- Expect \x11 + lo_get +-------- + \x11 +(1 row) + +SELECT lo_get(5003); -- Expect ERROR (not found) +ERROR: large object 5003 does not exist +SELECT lo_unlink(5001); + lo_unlink +----------- + 1 +(1 row) + +SELECT lo_unlink(5002); + lo_unlink +----------- + 1 +(1 row) + +-- Test Atomic Failure on Permissions (lo_bulk_write) +SELECT lo_create(1008); -- owned by superuser + lo_create +----------- + 1008 +(1 row) + +ALTER LARGE OBJECT 1008 OWNER TO regress_lo_user; +SELECT lo_create(1009); -- owned by superuser + lo_create +----------- + 1009 +(1 row) + +SET SESSION AUTHORIZATION regress_lo_user; +DO $$ +BEGIN + -- This should fail because 1009 is owned by superuser + PERFORM lo_bulk_write(ARRAY[lo_open(1008, 131072), lo_open(1009, 131072)], ARRAY[E'\\x11'::bytea, E'\\x22'::bytea]); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'Caught expected permission error: %', SQLERRM; +END $$; +NOTICE: Caught expected permission error: permission denied for large object 1009 +RESET SESSION AUTHORIZATION; +-- Verify atomicity: 1008 should NOT have been updated +SELECT lo_get(1008); -- Expect empty + lo_get +-------- + \x +(1 row) + +SELECT lo_unlink(1008); + lo_unlink +----------- + 1 +(1 row) + +SELECT lo_unlink(1009); + lo_unlink +----------- + 1 +(1 row) + +-- Test Array NULL handling +SELECT lo_bulk_put(ARRAY[6001, NULL, 6002], ARRAY[E'\\x11'::bytea, E'\\x22'::bytea, E'\\x33'::bytea]); +ERROR: array elements cannot be null +SELECT lo_bulk_put(ARRAY[7001, 7002], ARRAY[E'\\x11'::bytea, NULL]); +ERROR: array elements cannot be null +-- Trigger batch limit use-after-free bug +SELECT lo_bulk_put( + (SELECT array_agg(id::oid) FROM generate_series(1000000, 1000098) id), + (SELECT array_agg(E'\\x12'::bytea) FROM generate_series(1000000, 1000098)) +) > 0 AS bulk_put_success; + bulk_put_success +------------------ + t +(1 row) + +-- Test massive bulk insert exceeding all batch limits (1500 objects) +SELECT lo_bulk_put( + (SELECT array_agg(id::oid) FROM generate_series(2000000, 2001499) id), + (SELECT array_agg(E'\\x12'::bytea) FROM generate_series(2000000, 2001499)) +) > 0 AS massive_bulk_put_success; + massive_bulk_put_success +-------------------------- + t +(1 row) + DROP ROLE regress_lo_user; diff --git a/src/test/regress/sql/largeobject.sql b/src/test/regress/sql/largeobject.sql index a4aee02e3a4..800cd8f594d 100644 --- a/src/test/regress/sql/largeobject.sql +++ b/src/test/regress/sql/largeobject.sql @@ -324,4 +324,100 @@ ROLLBACK; -- Clean up DROP TABLE lotest_stash_values; +-- Test bulk apis +SET bytea_output to hex; +SELECT lo_bulk_put(ARRAY[1001, 1002], ARRAY[E'\\x1234'::bytea, E'\\x5678'::bytea]); +SELECT lo_get(1001); +SELECT lo_get(1002); +BEGIN; +SELECT lo_bulk_write(ARRAY[lo_open(1001, 131072), lo_open(1002, 131072)], ARRAY[E'\\x1111'::bytea, E'\\x2222'::bytea]); +SELECT lo_close(fd) from (SELECT unnest(ARRAY[lo_open(1001, 131072), lo_open(1002, 131072)]) as fd) a; +COMMIT; +SELECT lo_get(1001); +SELECT lo_get(1002); +SELECT lo_unlink(1001); +SELECT lo_unlink(1002); + +-- Test mixed-mode lo_bulk_write (Fast-path and Fallback-path interleaving) +SELECT lo_create(1003); -- empty +SELECT lo_put(lo_create(1004), 0, E'\\x1234'::bytea); -- 2 bytes +SELECT lo_put(lo_create(1005), 0, E'\\x5678'::bytea); -- 2 bytes + +BEGIN; +-- fd1: 1003 (offset 0, size 0) -> Fast-path +-- fd2: 1004 (offset 0, size 2) -> Fallback (should trigger flush of fd1) +-- fd3: 1005 (offset 2, size 2) -> Fast-path +DO $$ +DECLARE + fd1 int; fd2 int; fd3 int; +BEGIN + fd1 := lo_open(1003, 131072); -- INV_WRITE + fd2 := lo_open(1004, 131072); + fd3 := lo_open(1005, 131072); + PERFORM lo_lseek(fd3, 2, 0); -- move to end for fast-path append + + PERFORM lo_bulk_write(ARRAY[fd1, fd2, fd3], ARRAY[E'\\xAAAA'::bytea, E'\\xBBBB'::bytea, E'\\xCCCC'::bytea]); + + PERFORM lo_close(fd1); + PERFORM lo_close(fd2); + PERFORM lo_close(fd3); +END $$; +COMMIT; + +SELECT lo_get(1003); -- Expect \xaaaa +SELECT lo_get(1004); -- Expect \xbbbb (overwritten) +SELECT lo_get(1005); -- Expect \x5678cccc (appended) + +SELECT lo_unlink(1003); +SELECT lo_unlink(1004); +SELECT lo_unlink(1005); + +-- Test Savepoint and Rollback (lo_bulk_put) +BEGIN; +SELECT lo_bulk_put(ARRAY[5001, 5002], ARRAY[E'\\x11'::bytea, E'\\x22'::bytea]); +SAVEPOINT s1; +SELECT lo_bulk_put(ARRAY[5003, 5004], ARRAY[E'\\x33'::bytea, E'\\x44'::bytea]); +ROLLBACK TO SAVEPOINT s1; +COMMIT; +SELECT lo_get(5001); -- Expect \x11 +SELECT lo_get(5003); -- Expect ERROR (not found) +SELECT lo_unlink(5001); +SELECT lo_unlink(5002); + +-- Test Atomic Failure on Permissions (lo_bulk_write) +SELECT lo_create(1008); -- owned by superuser +ALTER LARGE OBJECT 1008 OWNER TO regress_lo_user; +SELECT lo_create(1009); -- owned by superuser + +SET SESSION AUTHORIZATION regress_lo_user; +DO $$ +BEGIN + -- This should fail because 1009 is owned by superuser + PERFORM lo_bulk_write(ARRAY[lo_open(1008, 131072), lo_open(1009, 131072)], ARRAY[E'\\x11'::bytea, E'\\x22'::bytea]); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'Caught expected permission error: %', SQLERRM; +END $$; +RESET SESSION AUTHORIZATION; + +-- Verify atomicity: 1008 should NOT have been updated +SELECT lo_get(1008); -- Expect empty +SELECT lo_unlink(1008); +SELECT lo_unlink(1009); + +-- Test Array NULL handling +SELECT lo_bulk_put(ARRAY[6001, NULL, 6002], ARRAY[E'\\x11'::bytea, E'\\x22'::bytea, E'\\x33'::bytea]); +SELECT lo_bulk_put(ARRAY[7001, 7002], ARRAY[E'\\x11'::bytea, NULL]); + +-- Trigger batch limit use-after-free bug +SELECT lo_bulk_put( + (SELECT array_agg(id::oid) FROM generate_series(1000000, 1000098) id), + (SELECT array_agg(E'\\x12'::bytea) FROM generate_series(1000000, 1000098)) +) > 0 AS bulk_put_success; + +-- Test massive bulk insert exceeding all batch limits (1500 objects) +SELECT lo_bulk_put( + (SELECT array_agg(id::oid) FROM generate_series(2000000, 2001499) id), + (SELECT array_agg(E'\\x12'::bytea) FROM generate_series(2000000, 2001499)) +) > 0 AS massive_bulk_put_success; + DROP ROLE regress_lo_user; -- 2.54.0.545.g6539524ca2-goog