diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index e99ebd2..0c80371 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -8,7 +8,7 @@ subdir = src/backend/storage/ipc top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = dsa.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ +OBJS = dht.o dsa.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ procsignal.o shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o \ sinvaladt.o standby.o diff --git a/src/backend/storage/ipc/dht.c b/src/backend/storage/ipc/dht.c new file mode 100644 index 0000000..1fc6aa1 --- /dev/null +++ b/src/backend/storage/ipc/dht.c @@ -0,0 +1,1427 @@ +/*------------------------------------------------------------------------- + * + * dht.c + * Concurrent hash tables backed by dynamic shared memory areas. + * + * This is an open hashing hash table, with a linked list at each table + * entry. It supports dynamic resizing, as required to prevent the linked + * lists from growing too long on average. Currently, only growing is + * supported: the hash table never becomes smaller. + * + * To deal with currency, it has a fixed size set of partitions, each of which + * is independently locked. Each bucket maps to a partition; so insert, find + * and iterate operations normally only acquire one lock. Therefore, good + * concurrency is achieved whenever they don't collide at the lock partition + * level. However, when a resize operation begins, all partition locks must + * be acquired simultaneously for a brief period. This is only expected to + * happen a small number of times until a stable size is found, since growth is + * geometric. + * + * Resizing is done incrementally so that no individual insert operation pays + * for the potentially large cost of splitting all buckets. A resize + * operation begins when any partition exceeds a certain load factor, and + * splits every bucket into two new buckets but doesn't yet transfer any items + * into the new buckets. Then, future dht_release operations involving an + * exclusive lock transfer one item from an old bucket into the appropriate + * new bucket. If possible, an item from the already-locked partition is + * migrated. If none remain, a lock on another partition will be acquired and + * one item from that partition will be migrated. This continues until there + * are no items left in old buckets, and the resize operation is finished. + * While a resize is in progress, find, insert and delete operations must look + * in a new bucket and an old bucket, but this is not too expensive since they + * are covered by the same lock. That is, a given hash value maps to the same + * lock partition, no matter how many times buckets have been split. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/ipc/dht.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "storage/dht.h" +#include "storage/dsa.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/memutils.h" + +/* + * An item in the hash table. This wraps the user's entry object in an + * envelop that holds a pointer back to the bucket and a pointer to the next + * item in the bucket. + */ +struct dht_hash_table_item +{ + /* The hashed key, to avoid having to recompute it. */ + dht_hash hash; + /* The next item in the same bucket. */ + dsa_pointer next; + /* The user's entry object follows here. */ + char entry[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* The number of partitions for locking purposes. */ +#define DHT_NUM_PARTITIONS_LOG2 7 +#define DHT_NUM_PARTITIONS (1 << DHT_NUM_PARTITIONS_LOG2) + +/* A magic value used to identify DHT hash tables. */ +#define DHT_MAGIC 0x75ff6a20 + +/* + * Tracking information for each lock partition. Initially, each partition + * corresponds to one bucket, but each time the hash table grows, the buckets + * covered by each partition split so the number of buckets covered doubles. + * + * We might want to add padding here so that each partition is on a different + * cache line, but doing so would bloat this structure considerably. + */ +typedef struct +{ + LWLock lock; /* Protects all buckets in this partition. */ + Size count; /* # of items in active buckets */ + Size old_count; /* # of items in old buckets */ + Index next_bucket_to_split; /* do_increment_resize progress + * tracker */ +} dht_partition; + +/* + * The head object for a hash table. This will be stored in dynamic shared + * memory. + */ +typedef struct +{ + dht_hash_table_handle handle; + uint32 magic; + dht_partition partitions[DHT_NUM_PARTITIONS]; + int lwlock_tranche_id; + char lwlock_tranche_name[NAMEDATALEN]; + + /* + * The following members are written to only when ALL partitions locks are + * held. They can be read when any one partition lock is held. + */ + + /* Number of buckets expressed as power of 2 (8 = 256 buckets). */ + Size size_log2; /* log2(number of buckets) */ + dsa_pointer buckets; /* current bucket array */ + dsa_pointer old_buckets; /* previous bucket array, if splitting */ +} dht_hash_table_control; + +/* + * Per-backend state for a dynamic hash table. + */ +struct dht_hash_table +{ + dsa_area *area; /* Backing dynamic shared memory area. */ + dht_parameters params; /* Parameters. */ + dht_hash_table_control *control; /* Control object in DSM. */ + dsa_pointer *buckets; /* Current bucket pointers in DSM. */ + Size size_log2; /* log2(number of buckets) */ + dsa_pointer *old_buckets; /* Old bucket pointers in DSM. */ + LWLockTranche lwlock_tranche; /* Tranche ID for this table's + * lwlocks. */ + bool exclusively_locked; /* Is an exclusive partition lock + * held? */ +}; + +/* Given a pointer to an entry, find the pointer to the item that holds it. */ +#define ITEM_FROM_ENTRY(entry) \ + ((dht_hash_table_item *)((char *)entry - \ + offsetof(dht_hash_table_item, entry))) + +/* How many resize operations (bucket splits) have there been? */ +#define NUM_SPLITS(size_log2) \ + (size_log2 - DHT_NUM_PARTITIONS_LOG2) + +/* How many buckets are there in each partition at a given size? */ +#define BUCKETS_PER_PARTITION(size_log2) \ + (1 << NUM_SPLITS(size_log2)) + +/* Max entries before we need to grow. Half + quarter = 75% load factor. */ +#define MAX_COUNT_PER_PARTITION(hash_table) \ + (BUCKETS_PER_PARTITION(hash_table->size_log2) / 2 + \ + BUCKETS_PER_PARTITION(hash_table->size_log2) / 4) + +/* Choose partition based on the highest order bits of the hash. */ +#define PARTITION_FOR_HASH(hash) \ + (hash >> ((sizeof(dht_hash) * CHAR_BIT) - DHT_NUM_PARTITIONS_LOG2)) + +/* + * Find the bucket index for a given hash and table size. Each time the table + * doubles in size, the appropriate bucket for a given hash value doubles and + * possibly adds one, depending on the newly revealed bit, so that all buckets + * are split. + */ +#define BUCKET_INDEX_FOR_HASH_AND_SIZE(hash, size_log2) \ + (hash >> ((sizeof(dht_hash) * CHAR_BIT) - (size_log2))) + +/* The index of the first bucket in a given partition. */ +#define BUCKET_INDEX_FOR_PARTITION(partition, size_log2) \ + ((partition) << NUM_SPLITS(size_log2)) + +/* The head of the active bucket for a given hash value (lvalue). */ +#define BUCKET_FOR_HASH(hash_table, hash) \ + (hash_table->buckets[ \ + BUCKET_INDEX_FOR_HASH_AND_SIZE(hash, \ + hash_table->size_log2)]) + +/* The head of the old bucket for a given hash value (lvalue). */ +#define OLD_BUCKET_FOR_HASH(hash_table, hash) \ + (hash_table->old_buckets[ \ + BUCKET_INDEX_FOR_HASH_AND_SIZE(hash, \ + hash_table->size_log2 - 1)]) + +/* Is a resize currently in progress? */ +#define RESIZE_IN_PROGRESS(hash_table) \ + (hash_table->old_buckets != NULL) + +static void delete_item(dht_hash_table *hash_table, dht_hash_table_item * item); +static void advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head, + dsa_pointer *next); +static dsa_pointer next_in_bucket(dht_iterator *iterator); +static bool begin_resize(dht_hash_table *hash_table, Size new_size); +static void finish_resize(dht_hash_table *hash_table); +static void do_incremental_resize(dht_hash_table *hash_table, int p); +static void search_for_resize_work(dht_hash_table *hash_table, + int start_partition); +static inline void ensure_valid_bucket_pointers(dht_hash_table *hash_table); +static inline dht_hash_table_item *find_in_bucket(dht_hash_table *hash_table, + const void *key, + dsa_pointer item_pointer); +static void insert_item_into_bucket(dht_hash_table *hash_table, + dsa_pointer item_pointer, + dht_hash_table_item * item, + dsa_pointer *bucket); +static dht_hash_table_item *insert_into_bucket(dht_hash_table *hash_table, + const void *key, + dsa_pointer *bucket); +static bool delete_key_from_bucket(dht_hash_table *hash_table, const void *key, + dsa_pointer *bucket_head); +static bool delete_item_from_bucket(dht_hash_table *hash_table, + dht_hash_table_item * item, + dsa_pointer *bucket_head); + +#define PARTITION_LOCK(hash_table, i) \ + (&hash_table->control->partitions[i].lock) + +/* + * Create a new hash table backed by the given dynamic shared area, with the + * given parameters. + */ +dht_hash_table * +dht_create(dsa_area *area, const dht_parameters *params) +{ + dht_hash_table *hash_table; + dsa_pointer control; + + /* Allocate the backend-local object representing the hash table. */ + hash_table = palloc(sizeof(dht_hash_table)); + + /* Allocate the control object in shared memory. */ + control = dsa_allocate(area, sizeof(dht_hash_table_control)); + if (!DsaPointerIsValid(control)) + { + /* Can't allocate memory, give up. */ + pfree(hash_table); + return NULL; + } + + /* Set up the local and shared hash table structs. */ + hash_table->area = area; + hash_table->params = *params; + hash_table->control = dsa_get_address(area, control); + hash_table->control->handle = control; + hash_table->control->magic = DHT_MAGIC; + + /* Set up the tranche of locks, and register it in this process. */ + hash_table->control->lwlock_tranche_id = params->tranche_id; + strcpy(hash_table->control->lwlock_tranche_name, + params->tranche_name); + hash_table->lwlock_tranche.name = params->tranche_name; + hash_table->lwlock_tranche.array_base = + &hash_table->control->partitions[0].lock; + hash_table->lwlock_tranche.array_stride = sizeof(dht_partition); + LWLockRegisterTranche(hash_table->control->lwlock_tranche_id, + &hash_table->lwlock_tranche); + + /* Set up the array of lock partitions. */ + { + int i; + + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + { + LWLockInitialize(PARTITION_LOCK(hash_table, i), + hash_table->control->lwlock_tranche_id); + hash_table->control->partitions[i].count = 0; + } + } + + hash_table->exclusively_locked = false; + + /* + * Set up the initial array of buckets. Our initial size is the same as + * the number of partitions. + */ + hash_table->control->size_log2 = DHT_NUM_PARTITIONS_LOG2; + hash_table->control->buckets = + dsa_allocate(area, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS); + hash_table->buckets = dsa_get_address(area, + hash_table->control->buckets); + hash_table->control->old_buckets = InvalidDsaPointer; + hash_table->old_buckets = NULL; + memset(hash_table->buckets, 0, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS); + + return hash_table; +} + +/* + * Attach to an existing hash table using a handle. + */ +dht_hash_table * +dht_attach(dsa_area *area, const dht_parameters *params, + dht_hash_table_handle handle) +{ + dht_hash_table *hash_table; + dsa_pointer control; + + /* Allocate the backend-local object representing the hash table. */ + hash_table = palloc(sizeof(dht_hash_table)); + + /* Find the control object in shared memory. */ + control = handle; + + /* Set up the local hash table struct. */ + hash_table->area = area; + hash_table->params = *params; + hash_table->control = dsa_get_address(area, control); + hash_table->exclusively_locked = false; + Assert(hash_table->control->magic == DHT_MAGIC); + + /* Register the tranche of locks in this process. */ + hash_table->lwlock_tranche.name = + hash_table->control->lwlock_tranche_name; + hash_table->lwlock_tranche.array_base = + &hash_table->control->partitions[0].lock; + hash_table->lwlock_tranche.array_stride = sizeof(dht_partition); + LWLockRegisterTranche(hash_table->control->lwlock_tranche_id, + &hash_table->lwlock_tranche); + + return hash_table; +} + +/* + * Detach from a hash table. This frees backend-local resources associated + * with the hash table, but the hash table will continue to exist until it is + * either explicitly destroyed (by a backend that is still attached to it), or + * the area that backs it is returned to the operating system. + */ +void +dht_detach(dht_hash_table *hash_table) +{ + /* The hash table may have been destroyed. Just free local memory. */ + pfree(hash_table); +} + +/* + * Destroy a hash table, returning all memory to the area. The caller must be + * certain that no other backend will attempt to access the hash table before + * calling this function. Other backend must explicitly call dht_detach to + * free up backend-local memory associated with the hash table. The backend + * that calls dht_destroy must not call dht_detach. + */ +void +dht_destroy(dht_hash_table *hash_table) +{ + dht_iterator iterator; + void *entry; + + Assert(hash_table->control->magic == DHT_MAGIC); + + /* Free all the entries. */ + dht_iterate_begin(hash_table, &iterator, true); + while ((entry = dht_iterate_next(&iterator))) + dht_iterate_delete(&iterator); + dht_iterate_end(&iterator); + + /* + * Vandalize the control block to help catch programming errors where + * other backends access the memory formerly occupied by this hash table. + */ + hash_table->control->magic = 0; + + /* Free the active and (if appropriate) old table, and control object. */ + if (DsaPointerIsValid(hash_table->old_buckets)) + dsa_free(hash_table->area, hash_table->control->old_buckets); + dsa_free(hash_table->area, hash_table->control->buckets); + dsa_free(hash_table->area, hash_table->control->handle); + + pfree(hash_table); +} + +/* + * Get a handle that can be used by other processes to attach to this hash + * table. + */ +dht_hash_table_handle +dht_get_hash_table_handle(dht_hash_table *hash_table) +{ + Assert(hash_table->control->magic == DHT_MAGIC); + + return hash_table->control->handle; +} + +/* + * Look up an entry, given a key. Returns a pointer to an entry if one can be + * found with the given key. Returns NULL if the key is not found. If a + * non-NULL value is returned, the entry is locked and must be released by + * calling dht_release. If an error is raised before dht_release is called, + * the lock will be released automatically, but the caller must take care to + * ensure that the entry is not left corrupted. The lock mode is either + * shared or exclusive depending on 'exclusive'. + * + * Note that the lock held is in fact an LWLock, so interrupts will be held + * on return from this function, and not resumed until dht_release is called. + * It is a very good idea for the caller to release the lock quickly. + */ +void * +dht_find(dht_hash_table *hash_table, const void *key, bool exclusive) +{ + Size hash; + Size partition; + dht_hash_table_item *item; + + hash = hash_table->params.hash_function(key, hash_table->params.key_size); + partition = PARTITION_FOR_HASH(hash); + + Assert(hash_table->control->magic == DHT_MAGIC); + Assert(!hash_table->exclusively_locked); + + LWLockAcquire(PARTITION_LOCK(hash_table, partition), + exclusive ? LW_EXCLUSIVE : LW_SHARED); + ensure_valid_bucket_pointers(hash_table); + + /* Search the active bucket. */ + item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash)); + + /* If not found and a resize is in progress, search the old bucket too. */ + if (!item && RESIZE_IN_PROGRESS(hash_table)) + item = find_in_bucket(hash_table, key, + OLD_BUCKET_FOR_HASH(hash_table, hash)); + + if (!item) + { + /* Not found. */ + LWLockRelease(PARTITION_LOCK(hash_table, partition)); + return NULL; + } + else + { + /* The caller will free the lock by calling dht_release. */ + hash_table->exclusively_locked = exclusive; + return &item->entry; + } +} + +/* + * Returns a pointer to an exclusively locked item which must be released with + * dht_release. If the key is found in the hash table, 'found' is set to true + * and a pointer to the existing entry is returned. If the key is not found, + * 'found' is set to false, and a pointer to a newly created entry is related. + */ +void * +dht_find_or_insert(dht_hash_table *hash_table, + const void *key, + bool *found) +{ + Size hash; + Size partition_index; + dht_partition *partition; + dht_hash_table_item *item; + + hash = hash_table->params.hash_function(key, hash_table->params.key_size); + partition_index = PARTITION_FOR_HASH(hash); + partition = &hash_table->control->partitions[partition_index]; + + Assert(hash_table->control->magic == DHT_MAGIC); + Assert(!hash_table->exclusively_locked); + +restart: + LWLockAcquire(PARTITION_LOCK(hash_table, partition_index), + LW_EXCLUSIVE); + ensure_valid_bucket_pointers(hash_table); + + /* Search the active bucket. */ + item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash)); + + /* If not found and a resize is in progress, search the old bucket too. */ + if (!item && RESIZE_IN_PROGRESS(hash_table)) + item = find_in_bucket(hash_table, key, + OLD_BUCKET_FOR_HASH(hash_table, hash)); + + if (item) + *found = true; + else + { + *found = false; + + /* Check if we are getting too full. */ + if (partition->count > MAX_COUNT_PER_PARTITION(hash_table) && + !RESIZE_IN_PROGRESS(hash_table)) + { + /* + * The load factor (= keys / buckets) for all buckets protected by + * this partition is > 0.75. Presumably the same applies + * generally across the whole hash table (though we don't attempt + * to track that directly to avoid contention on some kind of + * central counter; we just assume that this partition is + * representative). This is a good time to start a new resize + * operation, if there isn't one already in progress. + * + * Give up our existing lock first, because resizing needs to + * reacquire all the locks in the right order. + */ + LWLockRelease(PARTITION_LOCK(hash_table, partition_index)); + if (!begin_resize(hash_table, hash_table->size_log2 + 1)) + { + /* Out of memory. */ + return NULL; + } + + goto restart; + } + + /* Finally we can try to insert the new item. */ + item = insert_into_bucket(hash_table, key, + &BUCKET_FOR_HASH(hash_table, hash)); + if (item) + { + item->hash = hash; + /* Adjust per-lock-partition counter for load factor knowledge. */ + ++partition->count; + } + } + + if (!item) + { + /* Out of memory. */ + LWLockRelease(PARTITION_LOCK(hash_table, partition_index)); + return NULL; + } + else + { + /* The caller must release the lock with dht_release. */ + hash_table->exclusively_locked = true; + return &item->entry; + } +} + +/* + * Remove an entry by key. Returns true if the key was found and the + * corresponding entry was removed. + * + * To delete an entry that you already have a pointer to, see + * dht_delete_entry. + */ +bool +dht_delete_key(dht_hash_table *hash_table, const void *key) +{ + Size hash; + Size partition; + bool found; + + hash = hash_table->params.hash_function(key, hash_table->params.key_size); + + Assert(hash_table->control->magic == DHT_MAGIC); + Assert(!hash_table->exclusively_locked); + + partition = PARTITION_FOR_HASH(hash); + + LWLockAcquire(PARTITION_LOCK(hash_table, partition), LW_EXCLUSIVE); + ensure_valid_bucket_pointers(hash_table); + + /* + * Try the active bucket first, and then the outgoing bucket if we are + * resizing. It can't be in both. + */ + if (delete_key_from_bucket(hash_table, key, + &BUCKET_FOR_HASH(hash_table, hash))) + { + Assert(hash_table->control->partitions[partition].count > 0); + found = true; + --hash_table->control->partitions[partition].count; + } + else if (RESIZE_IN_PROGRESS(hash_table) && + delete_key_from_bucket(hash_table, key, + &OLD_BUCKET_FOR_HASH(hash_table, hash))) + { + Assert(hash_table->control->partitions[partition].old_count > 0); + found = true; + --hash_table->control->partitions[partition].old_count; + } + else + found = false; + + LWLockRelease(PARTITION_LOCK(hash_table, partition)); + + return found; +} + +/* + * Remove an entry. The entry must already be exclusively locked, and must + * have been obtained by dht_find or dht_find_or_insert. Note that this + * function releases the lock just like dht_release. + * + * To delete an entry by key, see dht_delete_key. To delete an entry while + * iterating, see dht_iterator_delete. + */ +void +dht_delete_entry(dht_hash_table *hash_table, void *entry) +{ + dht_hash_table_item *item = ITEM_FROM_ENTRY(entry); + Size partition = PARTITION_FOR_HASH(item->hash); + + Assert(hash_table->control->magic == DHT_MAGIC); + Assert(hash_table->exclusively_locked); + + delete_item(hash_table, item); + hash_table->exclusively_locked = false; + LWLockRelease(PARTITION_LOCK(hash_table, partition)); +} + +/* + * Unlock an entry which was locked by dht_find or dht_find_or_insert. + */ +void +dht_release(dht_hash_table *hash_table, void *entry) +{ + dht_hash_table_item *item = ITEM_FROM_ENTRY(entry); + Size partition_index = PARTITION_FOR_HASH(item->hash); + bool deferred_resize_work = false; + + Assert(hash_table->control->magic == DHT_MAGIC); + + /* + * If there is a resize in progress and we took an exclusive lock, try to + * migrate a key from the old bucket array to the new bucket array. + * Ideally, we'll find some work to do in the partition on which we + * already hold a lock, but if not, we'll make a note to search other lock + * partitions after releasing the lock we currently hold. + */ + if (RESIZE_IN_PROGRESS(hash_table) && hash_table->exclusively_locked) + { + dht_partition *partition; + + partition = &hash_table->control->partitions[partition_index]; + + if (partition->old_count > 0) + { + /* Do some resizing work in this partition. */ + do_incremental_resize(hash_table, partition_index); + } + else + { + /* Nothing left to do in this partition. Look elsewhere. */ + deferred_resize_work = true; + } + } + + hash_table->exclusively_locked = false; + LWLockRelease(PARTITION_LOCK(hash_table, partition_index)); + + /* + * If we need to contribute some work to the ongoing resize effort, and we + * didn't find something to migrate in our own partition, go search for + * such work in other partitions. We'll start with the one after ours. + */ + if (deferred_resize_work) + search_for_resize_work(hash_table, + (partition_index + 1) % DHT_NUM_PARTITIONS); +} + +/* + * Begin iterating through the whole hash table. The caller must supply a + * dht_iterator object, which can then be used to call dht_iterate_next to get + * values until the end is reached. + */ +void +dht_iterate_begin(dht_hash_table *hash_table, + dht_iterator *iterator, + bool exclusive) +{ + Assert(hash_table->control->magic == DHT_MAGIC); + + iterator->hash_table = hash_table; + iterator->exclusive = exclusive; + iterator->partition = 0; + iterator->bucket = 0; + iterator->item = NULL; + iterator->last_item_pointer = InvalidDsaPointer; + iterator->locked = false; + + /* Snapshot the size (arbitrary lock to prevent size changing). */ + LWLockAcquire(PARTITION_LOCK(hash_table, 0), LW_SHARED); + ensure_valid_bucket_pointers(hash_table); + iterator->table_size_log2 = hash_table->size_log2; + LWLockRelease(PARTITION_LOCK(hash_table, 0)); +} + +/* + * Delete the entry that was most recently returned by dht_iterate_next. The + * iterator must have been initialized in exclusive lock mode. + */ +void +dht_iterate_delete(dht_iterator *iterator) +{ + Assert(iterator->hash_table->control->magic == DHT_MAGIC); + Assert(iterator->exclusive); + Assert(iterator->item != NULL); + delete_item(iterator->hash_table, iterator->item); + iterator->item = NULL; +} + +/* + * Move to the next item in the hash table. Returns a pointer to an entry, or + * NULL if the end of the hash table has been reached. The item is locked in + * exclusive or shared mode depending on the argument given to + * dht_iterate_begin. The caller can optionally release the lock by calling + * dht_iterate_release, and then call dht_iterate_next again to move to the + * next entry. If the iteration is in exclusive mode, client code can also + * call dht_iterate_delete. When the end of the hash table is reached, or at + * any time, the client may call dht_iterate_end to abandon iteration. + */ +void * +dht_iterate_next(dht_iterator *iterator) +{ + dsa_pointer item_pointer; + + Assert(iterator->hash_table->control->magic == DHT_MAGIC); + + while (iterator->partition < DHT_NUM_PARTITIONS) + { + if (!iterator->locked) + { + LWLockAcquire(PARTITION_LOCK(iterator->hash_table, + iterator->partition), + iterator->exclusive ? LW_EXCLUSIVE : LW_SHARED); + iterator->locked = true; + } + + item_pointer = next_in_bucket(iterator); + if (DsaPointerIsValid(item_pointer)) + { + /* Remember this item, so that we can step over it next time. */ + iterator->last_item_pointer = item_pointer; + iterator->item = dsa_get_address(iterator->hash_table->area, + item_pointer); + return &(iterator->item->entry); + } + + /* We have reached the end of the bucket. */ + iterator->last_item_pointer = InvalidDsaPointer; + iterator->bucket += 1; + + /* + * If the last bucket was split while we were iterating, then we have + * been doing some expensive bucket merging to get each item. Now + * that we've completed that whole pre-split bucket (by merging all + * its descendants), we can update the table size of our iterator and + * start iterating cheaply again. + */ + iterator->bucket <<= + iterator->hash_table->size_log2 - iterator->table_size_log2; + iterator->table_size_log2 = iterator->hash_table->size_log2; + /* Have we gone past the last bucket in this partition? */ + if (iterator->bucket >= + BUCKET_INDEX_FOR_PARTITION(iterator->partition + 1, + iterator->table_size_log2)) + { + /* No more buckets. Try next partition. */ + LWLockRelease(PARTITION_LOCK(iterator->hash_table, + iterator->partition)); + iterator->locked = false; + ++iterator->partition; + iterator->bucket = + BUCKET_INDEX_FOR_PARTITION(iterator->partition, + iterator->table_size_log2); + } + } + iterator->item = NULL; + return NULL; +} + +/* + * Release the most recently obtained lock. This can optionally be called in + * between calls to dht_iterator_next to allow other processes to access the + * same partition of the hash table. + */ +void +dht_iterate_release(dht_iterator *iterator) +{ + Assert(iterator->locked); + LWLockRelease(PARTITION_LOCK(iterator->hash_table, iterator->partition)); + iterator->locked = false; +} + +/* + * Terminate iteration. This must be called after iteration completes, + * whether or not the end was reached. The iterator object may then be reused + * for another iteration. + */ +void +dht_iterate_end(dht_iterator *iterator) +{ + Assert(iterator->hash_table->control->magic == DHT_MAGIC); + if (iterator->locked) + LWLockRelease(PARTITION_LOCK(iterator->hash_table, + iterator->partition)); +} + +/* + * Print out debugging information about the internal state of the hash table. + */ +void +dht_dump(dht_hash_table *hash_table) +{ + Size i; + Size j; + + Assert(hash_table->control->magic == DHT_MAGIC); + + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED); + + ensure_valid_bucket_pointers(hash_table); + + fprintf(stderr, + "hash table size = %zu\n", (Size) 1 << hash_table->size_log2); + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + { + dht_partition *partition = &hash_table->control->partitions[i]; + Size begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2); + Size end = BUCKET_INDEX_FOR_PARTITION(i + 1, hash_table->size_log2); + + fprintf(stderr, " partition %zu\n", i); + fprintf(stderr, + " active buckets (key count = %zu)\n", partition->count); + + for (j = begin; j < end; ++j) + { + Size count = 0; + dsa_pointer bucket = hash_table->buckets[j]; + + while (DsaPointerIsValid(bucket)) + { + dht_hash_table_item *item; + + item = dsa_get_address(hash_table->area, bucket); + + bucket = item->next; + ++count; + } + fprintf(stderr, " bucket %zu (key count = %zu)\n", j, count); + } + if (RESIZE_IN_PROGRESS(hash_table)) + { + Size begin; + Size end; + + begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2 - 1); + end = BUCKET_INDEX_FOR_PARTITION(i + 1, + hash_table->size_log2 - 1); + + fprintf(stderr, " old buckets (key count = %zu)\n", + partition->old_count); + + for (j = begin; j < end; ++j) + { + Size count = 0; + dsa_pointer bucket = hash_table->old_buckets[j]; + + while (DsaPointerIsValid(bucket)) + { + dht_hash_table_item *item; + + item = dsa_get_address(hash_table->area, bucket); + + bucket = item->next; + ++count; + } + fprintf(stderr, + " bucket %zu (key count = %zu)\n", j, count); + } + } + } + + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + LWLockRelease(PARTITION_LOCK(hash_table, i)); +} + +/* + * Delete a locked item to which we have a pointer. + */ +static void +delete_item(dht_hash_table *hash_table, dht_hash_table_item * item) +{ + Size hash = item->hash; + Size partition = PARTITION_FOR_HASH(hash); + + Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, partition))); + + /* + * Try the active bucket first, and then the outgoing bucket if we are + * resizing. It can't be in both. + */ + if (delete_item_from_bucket(hash_table, item, + &BUCKET_FOR_HASH(hash_table, hash))) + { + --hash_table->control->partitions[partition].count; + Assert(hash_table->control->partitions[partition].count >= 0); + } + else if (RESIZE_IN_PROGRESS(hash_table) && + delete_item_from_bucket(hash_table, item, + &OLD_BUCKET_FOR_HASH(hash_table, hash))) + { + --hash_table->control->partitions[partition].old_count; + Assert(hash_table->control->partitions[partition].old_count >= 0); + } + else + { + Assert(false); + } +} + +/* + * Find the item that comes after iterator->last_item_pointer but before + * *next, and write it into *next. + * + * Calling this on several buckets allows us to 'merge' the items from a + * bucket with its ancestors and descendents during resize operations, and + * iterate through the items in all of them in a stable order, despite items + * being able to move while we iterate. + * + * This works because buckets are sorted by descending item pointer. + */ +static void +advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head, + dsa_pointer *next) +{ + dht_hash_table_item *item; + + while (DsaPointerIsValid(bucket_head)) + { + item = dsa_get_address(iterator->hash_table->area, + bucket_head); + if ((!DsaPointerIsValid(iterator->last_item_pointer) || + bucket_head < iterator->last_item_pointer) && + bucket_head > *next && + BUCKET_INDEX_FOR_HASH_AND_SIZE(item->hash, + iterator->table_size_log2) == + iterator->bucket) + { + *next = bucket_head; + return; + } + bucket_head = item->next; + } +} + +/* + * Find the next item in the bucket we are visiting. If a resize is in + * progress or the bucket has been split since we started iterating over this + * bucket, we may need to combine items from multiple buckets to synthesize + * the original bucket. + */ +static dsa_pointer +next_in_bucket(dht_iterator *iterator) +{ + Size bucket; + Size begin; + Size end; + dsa_pointer next = InvalidDsaPointer; + + Assert(LWLockHeldByMe(PARTITION_LOCK(iterator->hash_table, + iterator->partition))); + + /* Handle old buckets if a resize is in progress. */ + if (RESIZE_IN_PROGRESS(iterator->hash_table)) + { + if (iterator->table_size_log2 == iterator->hash_table->size_log2) + { + /* + * The table hasn't grown since we started iterating, but there + * was already a resize in progress so there may be items in an + * old bucket that belong in the bucket we're iterating over which + * haven't been migrated yet. + */ + advance_iterator(iterator, + iterator->hash_table->old_buckets[iterator->bucket / 2], + &next); + } + else + { + /* + * The bucket has been split one or more times, and a resize is + * still in progress so there may be items still in an old bucket. + * If N splits have occurred, there will be 2^N current buckets + * that correspond to the one original bucket. + */ + Assert(iterator->hash_table->size_log2 > iterator->table_size_log2); + begin = iterator->bucket << + (iterator->hash_table->size_log2 - iterator->table_size_log2 - 1); + end = (iterator->bucket + 1) << + (iterator->hash_table->size_log2 - iterator->table_size_log2 - 1); + for (bucket = begin; bucket < end; bucket += 1) + advance_iterator(iterator, + iterator->hash_table->old_buckets[bucket], + &next); + } + } + + /* + * Scan the bucket. This is a loop because it may have split any number + * of times since we started iterating through it, but in the common case + * we just loop once. + */ + begin = iterator->bucket << + (iterator->hash_table->size_log2 - iterator->table_size_log2); + end = (iterator->bucket + 1) << + (iterator->hash_table->size_log2 - iterator->table_size_log2); + for (bucket = begin; bucket < end; ++bucket) + advance_iterator(iterator, + iterator->hash_table->buckets[iterator->bucket], + &next); + + return next; +} + +/* + * Grow the hash table if necessary to the requested number of buckets. The + * requested size must be double some previously observed size. Returns true + * if the table was successfully expanded or found to be big enough already + * (because another backend expanded it). Returns false for out-of-memory. + * The process of transferring items from the old table to the new one will be + * carried out with a small tax on all future operations until the work is + * done. + */ +static bool +begin_resize(dht_hash_table *hash_table, Size new_size_log2) +{ + int i; + dsa_pointer new_buckets; + bool result; + + /* + * Acquire the locks for all lock partitions. This is expensive, but we + * shouldn't have to do it many times. + */ + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + { + LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE); + if (i == 0 && hash_table->control->size_log2 >= new_size_log2) + { + /* + * Another backend has already increased the size; we can avoid + * obtaining all the locks and return early. + */ + LWLockRelease(PARTITION_LOCK(hash_table, 0)); + return true; + } + } + + Assert(new_size_log2 == hash_table->control->size_log2 + 1); + Assert(!RESIZE_IN_PROGRESS(hash_table)); + + /* Allocate the space for the new table. */ + new_buckets = dsa_allocate(hash_table->area, + sizeof(dsa_pointer) * (1 << new_size_log2)); + if (DsaPointerIsValid(new_buckets)) + { + result = true; + hash_table->control->old_buckets = hash_table->control->buckets; + hash_table->control->buckets = new_buckets; + hash_table->control->size_log2 = new_size_log2; + hash_table->buckets = dsa_get_address(hash_table->area, + hash_table->control->buckets); + hash_table->old_buckets = + dsa_get_address(hash_table->area, + hash_table->control->old_buckets); + hash_table->size_log2 = new_size_log2; + /* InvalidDsaPointer in every bucket */ + memset(hash_table->buckets, 0, + sizeof(dsa_pointer) * (1 << new_size_log2)); + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + { + dht_partition *partition = &hash_table->control->partitions[i]; + + partition->old_count = partition->count; + partition->count = 0; + partition->next_bucket_to_split = + BUCKET_INDEX_FOR_PARTITION(i, new_size_log2 - 1); + } + } + else + { + /* Out of memory. */ + result = false; + } + + /* Release all the locks. */ + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + LWLockRelease(PARTITION_LOCK(hash_table, i)); + + return result; +} + +/* + * Check if the resize operation is finished. If it is, then the old table + * can be returned to the shared memory area. + */ +static void +finish_resize(dht_hash_table *hash_table) +{ + int i, + j; + bool finished = true; + + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + { + dht_partition *partition = &hash_table->control->partitions[i]; + + LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE); + if (i == 0) + { + ensure_valid_bucket_pointers(hash_table); + if (!RESIZE_IN_PROGRESS(hash_table)) + { + /* Another backend has already finished the resize. */ + LWLockRelease(PARTITION_LOCK(hash_table, i)); + return; + } + } + + /* + * We can only actually finish the resize when every partition has no + * more keys in old buckets. This might not be true yet, because + * search_for_resize_work calls us when it thinks there might be no + * more resizing work to do, but it could be wrong because it judges + * that with an unsynchronized view of memory. Now that we can + * examine the synchronized counters, we can check if the resize + * really is complete. + */ + if (partition->old_count > 0) + { + finished = false; + break; + } + } + + if (finished) + { + dsa_free(hash_table->area, hash_table->control->old_buckets); + hash_table->control->old_buckets = InvalidDsaPointer; + } + + /* + * Only release as many locks as we acquired. When finished is true, this + * will be all of them, but if we exited the lock-acquire loop early it + * might be fewer. + */ + for (j = 0; j < i; ++j) + LWLockRelease(PARTITION_LOCK(hash_table, j)); +} + +/* + * Migrate one item from partition 'p' from an old bucket to the appropriate + * new bucket. The partition lock must already be held and there must be at + * least one item to be migrated. + */ +static void +do_incremental_resize(dht_hash_table *hash_table, int p) +{ + Size old_size_log2 = hash_table->control->size_log2 - 1; + dht_partition *partition = &hash_table->control->partitions[p]; + + Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, p))); + Assert(partition->old_count > 0); + + /* + * Walk through all the buckets that are covered by this partition until + * we find one that is not empty, and migrate one item from it. + */ + for (;;) + { + dsa_pointer item_pointer; + + item_pointer = + hash_table->old_buckets[partition->next_bucket_to_split]; + + if (DsaPointerIsValid(item_pointer)) + { + dht_hash_table_item *item; + + item = dsa_get_address(hash_table->area, item_pointer); + + /* Remove it from the old bucket. */ + hash_table->old_buckets[partition->next_bucket_to_split] = + item->next; + --partition->old_count; + + /* Add it to the active bucket. */ + insert_item_into_bucket(hash_table, + item_pointer, + item, + &BUCKET_FOR_HASH(hash_table, item->hash)); + ++partition->count; + + return; + } + else + { + /* Move to the next bucket protected by this partition. */ + partition->next_bucket_to_split += 1; + + /* + * We shouldn't be able to reach past the end of the partition + * without finding an item. That would imply that the partition's + * old_count was corrupted. + * + * This is an elog() because we want to catch this even in builds + * that are not Assert-enabled. + */ + if (partition->next_bucket_to_split >= + BUCKET_INDEX_FOR_PARTITION(p + 1, old_size_log2)) + elog(ERROR, "dht corrupted"); + } + } +} + +/* + * Try to find a partition that has outstanding resizing work and do some of + * that work, possibly completing the resize operation. The caller must hold + * no locks. + * + * Consider 'start_partition' first. dht_release tries to spread this around + * to try to reduce contention. + */ +static void +search_for_resize_work(dht_hash_table *hash_table, int start_partition) +{ + int i = start_partition; + + Assert(start_partition >= 0); + Assert(start_partition < DHT_NUM_PARTITIONS); + + /* + * Using a non-synchronized view of memory (no locks), see if we can find + * any partition that looks like it still has keys to be migrated. + */ + do + { + dht_partition *partition = &hash_table->control->partitions[i]; + + if (partition->old_count > 0) + { + bool done = false; + + /* Now check again with a lock. */ + LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE); + ensure_valid_bucket_pointers(hash_table); + if (!RESIZE_IN_PROGRESS(hash_table)) + { + /* The resize finished underneath us. */ + done = true; + } + else if (partition->old_count > 0) + { + /* There is in fact work to be done in this partition. */ + do_incremental_resize(hash_table, i); + done = true; + } + LWLockRelease(PARTITION_LOCK(hash_table, i)); + + if (done) + return; + } + i = (i + 1) % DHT_NUM_PARTITIONS; + } while (i != start_partition); + + /* + * We didn't find any partitions that seem to have remaining migration + * work, though we can't be 100% sure due to unsynchronized reads of + * memory. We will try to finish the resize, which may or may not + * actually succeed, depending on what it finds the true state to be. + */ + finish_resize(hash_table); +} + +/* + * Make sure that our backend-local bucket pointers are up to date. The + * caller must have locked one lock partition, which prevents begin_resize() + * from running concurrently. + */ +static inline void +ensure_valid_bucket_pointers(dht_hash_table *hash_table) +{ + if (hash_table->size_log2 != hash_table->control->size_log2) + { + /* + * A resize has started since we last looked, so our pointers are out + * of date. + */ + hash_table->buckets = dsa_get_address(hash_table->area, + hash_table->control->buckets); + if (DsaPointerIsValid(hash_table->control->old_buckets)) + hash_table->old_buckets = + dsa_get_address(hash_table->area, + hash_table->control->old_buckets); + else + hash_table->old_buckets = NULL; + hash_table->size_log2 = hash_table->control->size_log2; + } + if (hash_table->old_buckets && + !DsaPointerIsValid(hash_table->control->old_buckets)) + { + /* A resize has finished since we last looked. */ + hash_table->old_buckets = NULL; + } +} + +/* + * Scan a locked bucket for a match, using the provided compare-function. + */ +static inline dht_hash_table_item * +find_in_bucket(dht_hash_table *hash_table, const void *key, + dsa_pointer item_pointer) +{ + while (DsaPointerIsValid(item_pointer)) + { + dht_hash_table_item *item; + + item = dsa_get_address(hash_table->area, item_pointer); + if (hash_table->params.compare_function(key, &item->entry, + hash_table->params.key_size) == 0) + return item; + item_pointer = item->next; + } + return NULL; +} + +/* + * Insert an already-allocated item into a bucket. + */ +static void +insert_item_into_bucket(dht_hash_table *hash_table, + dsa_pointer item_pointer, + dht_hash_table_item * item, + dsa_pointer *bucket) +{ + Assert(item == dsa_get_address(hash_table->area, item_pointer)); + + /* + * Find the insertion point for this item in the bucket. Buckets are + * sorted by descending dsa_pointer, as part of the protocol for ensuring + * that iterators don't visit items twice when buckets are split during a + * scan. (We don't actually expect them to have more than 1 item unless + * the hash function is of low quality.) + */ + while (*bucket > item_pointer) + { + dht_hash_table_item *this_item; + + this_item = dsa_get_address(hash_table->area, *bucket); + + bucket = &this_item->next; + } + + item->next = *bucket; + *bucket = item_pointer; +} + +/* + * Allocate space for an entry with the given key and insert it into the + * provided bucket. + */ +static dht_hash_table_item * +insert_into_bucket(dht_hash_table *hash_table, + const void *key, + dsa_pointer *bucket) +{ + dsa_pointer item_pointer; + dht_hash_table_item *item; + + item_pointer = dsa_allocate(hash_table->area, + hash_table->params.entry_size + + offsetof(dht_hash_table_item, entry)); + if (!DsaPointerIsValid(item_pointer)) + /* Out of memory. */ + return NULL; + + item = dsa_get_address(hash_table->area, item_pointer); + memcpy(&(item->entry), key, hash_table->params.key_size); + insert_item_into_bucket(hash_table, item_pointer, item, bucket); + return item; +} + +/* + * Search a bucket for a matching key and delete it. + */ +static bool +delete_key_from_bucket(dht_hash_table *hash_table, + const void *key, + dsa_pointer *bucket_head) +{ + while (DsaPointerIsValid(*bucket_head)) + { + dht_hash_table_item *item; + + item = dsa_get_address(hash_table->area, *bucket_head); + + if (hash_table->params.compare_function(key, &item->entry, + hash_table->params.key_size) + == 0) + { + dsa_pointer next; + + next = item->next; + dsa_free(hash_table->area, *bucket_head); + *bucket_head = next; + return true; + } + bucket_head = &item->next; + } + return false; +} + +/* + * Delete the specified item from the bucket. + */ +static bool +delete_item_from_bucket(dht_hash_table *hash_table, + dht_hash_table_item * item, + dsa_pointer *bucket_head) +{ + while (DsaPointerIsValid(*bucket_head)) + { + dht_hash_table_item *bucket_item; + + bucket_item = dsa_get_address(hash_table->area, *bucket_head); + + if (bucket_item == item) + { + dsa_pointer next; + + next = item->next; + dsa_free(hash_table->area, *bucket_head); + *bucket_head = next; + return true; + } + bucket_head = &bucket_item->next; + } + return false; +} diff --git a/src/include/storage/dht.h b/src/include/storage/dht.h new file mode 100644 index 0000000..ccf9d17 --- /dev/null +++ b/src/include/storage/dht.h @@ -0,0 +1,117 @@ +/*------------------------------------------------------------------------- + * + * dht.h + * Concurrent hash tables backed by dynamic shared memory areas. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/storage/dht.h + * + *------------------------------------------------------------------------- + */ +#ifndef DHT_H +#define DHT_H + +#include "postgres.h" + +#include "storage/dsa.h" + +/* The opaque type representing a hash table. */ +struct dht_hash_table; +typedef struct dht_hash_table dht_hash_table; + +/* The opaque type used for tracking iterator state. */ +struct dht_iterator; +typedef struct dht_iterator dht_iterator; + +/* A handle for a dht_hash_table which can be shared with other processes. */ +typedef dsa_pointer dht_hash_table_handle; + +/* The type for hash values. */ +typedef uint32 dht_hash; + +/* + * The function type for computing hash values for keys. Compatible with + * HashValueFunc in hsearch.h and hash functions like tag_hash. + */ +typedef dht_hash (*dht_hash_function)(const void *v, Size size); + +/* + * The function used for comparing keys. Compatible with HashCompareFunction + * in hsearch.h and memcmp. + */ +typedef int (*dht_compare_function)(const void *a, const void *b, Size size); + +/* + * The set of parameters needed to create or attach to a hash table. The + * members tranche_id and tranche_name do not need to be initialized when + * attaching to an existing hash table. + */ +typedef struct +{ + Size key_size; /* Size of the key (initial bytes of entry) */ + Size entry_size; /* Total size of entry */ + dht_compare_function compare_function; /* Compare function */ + dht_hash_function hash_function; /* Hash function */ + int tranche_id; /* The tranche ID to use for locks. */ + char tranche_name[NAMEDATALEN]; /* The tranche name to use for locks. */ +} dht_parameters; + +/* Forward declaration of private types for use only by dht.c. */ +struct dht_hash_table_bucket; +struct dht_hash_table_item; +typedef struct dht_hash_table_item dht_hash_table_item; +typedef struct dht_hash_table_bucket dht_hash_table_bucket; + +/* + * The state used to track a walk over all entries in a hash table. The + * members of this struct are only for use by code in dht.c, but it is + * included in the header because it's useful to be able to create objects of + * this type on the stack to pass into the dht_iterator_XXX functions. + */ +struct dht_iterator +{ + dht_hash_table *hash_table; /* The hash table we are iterating over. */ + bool exclusive; /* Whether to lock buckets exclusively. */ + Size partition; /* The index of the next partition to visit. */ + Size bucket; /* The index of the next bucket to visit. */ + dht_hash_table_item *item; /* The most recently returned item. */ + dsa_pointer last_item_pointer; /* The last item visited. */ + Size table_size_log2; /* The table size when we started iterating. */ + bool locked; /* Whether the current partition is locked. */ +}; + +/* Creating, sharing and destroying from hash tables. */ +extern dht_hash_table *dht_create(dsa_area *area, + const dht_parameters *params); +extern dht_hash_table *dht_attach(dsa_area *area, + const dht_parameters *params, + dht_hash_table_handle handle); +extern void dht_detach(dht_hash_table *hash_table); +extern dht_hash_table_handle +dht_get_hash_table_handle(dht_hash_table *hash_table); +extern void dht_destroy(dht_hash_table *hash_table); + +/* Iterating over the whole hash table. */ +extern void dht_iterate_begin(dht_hash_table *hash_table, + dht_iterator *iterator, bool exclusive); +extern void *dht_iterate_next(dht_iterator *iterator); +extern void dht_iterate_delete(dht_iterator *iterator); +extern void dht_iterate_release(dht_iterator *iterator); +extern void dht_iterate_end(dht_iterator *iterator); + +/* Finding, creating, deleting entries. */ +extern void *dht_find(dht_hash_table *hash_table, + const void *key, bool exclusive); +extern void *dht_find_or_insert(dht_hash_table *hash_table, + const void *key, bool *found); +extern bool dht_delete_key(dht_hash_table *hash_table, const void *key); +extern void dht_delete_entry(dht_hash_table *hash_table, void *entry); +extern void dht_release(dht_hash_table *hash_table, void *entry); + +/* Debugging support. */ +extern void dht_dump(dht_hash_table *hash_table); + +#endif /* DHT_H */