-/*
-URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb
-Rev: 22080
-Last Changed: 2007-04-03 05:08:18 -0400
+/*
+URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
+Rev: 23590
+Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
*/
- /*
+ /*
trivial database library - standalone version
Copyright (C) Andrew Tridgell 1999-2005
Copyright (C) Jeremy Allison 2000-2006
Copyright (C) Paul `Rusty' Russell 2000
-
+
** NOTE! The following LGPL license applies to the tdb
** library. This does NOT imply that all of Samba is released
** under the LGPL
-
+
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
#define HAVE_UTIME_H
#define HAVE_UTIME
#endif
-#define _XOPEN_SOURCE 500
+#ifndef __FreeBSD__
+#define _XOPEN_SOURCE 600
+#endif
+#include "config.h"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#endif
#endif
+typedef int bool;
+
#include "tdb.h"
+static TDB_DATA tdb_null;
+
#ifndef u32
#define u32 unsigned
#endif
-#ifndef HAVE_GETPAGESIZE
-#define getpagesize() 0x2000
-#endif
-
typedef u32 tdb_len_t;
typedef u32 tdb_off_t;
struct tdb_transaction *transaction;
int page_size;
int max_dead_records;
+ bool have_transaction_lock;
+ tdb_len_t real_map_size; /* how much space has been mapped */
};
static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
+static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
+static int tdb_transaction_unlock(struct tdb_context *tdb);
static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
struct list_struct *rec);
static void tdb_io_init(struct tdb_context *tdb);
static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
+static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
+ struct list_struct *rec);
/* file: error.c */
/* file: lock.c */
+#define TDB_MARK_LOCK 0x80000000
+
/* a byte range locking function - return 0 on success
this functions locks/unlocks 1 byte at the specified offset.
On error, errno is also set so that errors are passed back properly
- through tdb_open().
+ through tdb_open().
note that a len of zero means lock to end of file
*/
-int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
+int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
int rw_type, int lck_type, int probe, size_t len)
{
struct flock fl;
if (!probe && lck_type != F_SETLK) {
/* Ensure error code is set for log fun to examine. */
tdb->ecode = TDB_ERR_LOCK;
- TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
+ TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
tdb->fd, offset, rw_type, lck_type, (int)len));
}
return TDB_ERRCODE(TDB_ERR_LOCK, -1);
upgrade a read lock to a write lock. This needs to be handled in a
special way as some OSes (such as solaris) have too conservative
deadlock detection and claim a deadlock when progress can be
- made. For those OSes we may loop for a while.
+ made. For those OSes we may loop for a while.
*/
int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
{
/* lock a list in the database. list -1 is the alloc list */
-int tdb_lock(struct tdb_context *tdb, int list, int ltype)
+static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
{
struct tdb_lock_type *new_lck;
int i;
+ bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
+
+ ltype &= ~TDB_MARK_LOCK;
/* a global lock allows us to avoid per chain locks */
- if (tdb->global_lock.count &&
+ if (tdb->global_lock.count &&
(ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
return 0;
}
}
if (list < -1 || list >= (int)tdb->header.hash_size) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
+ TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
list, ltype));
return -1;
}
/* Since fcntl locks don't nest, we do a lock for the first one,
and simply bump the count for future ones */
- if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW,
+ if (!mark_lock &&
+ tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
0, 1)) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
- "ltype=%d (%s)\n", list, ltype, strerror(errno)));
return -1;
}
return 0;
}
+/* lock a list in the database. list -1 is the alloc list */
+int tdb_lock(struct tdb_context *tdb, int list, int ltype)
+{
+ int ret;
+ ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
+ if (ret) {
+ TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
+ "ltype=%d (%s)\n", list, ltype, strerror(errno)));
+ }
+ return ret;
+}
+
+/* lock a list in the database. list -1 is the alloc list. non-blocking lock */
+int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
+{
+ return _tdb_lock(tdb, list, ltype, F_SETLK);
+}
+
+
/* unlock the database: returns void because it's too late for errors. */
/* changed to return int it may be interesting to know there
has been an error --simo */
int ret = -1;
int i;
struct tdb_lock_type *lck = NULL;
+ bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
+
+ ltype &= ~TDB_MARK_LOCK;
/* a global lock allows us to avoid per chain locks */
- if (tdb->global_lock.count &&
+ if (tdb->global_lock.count &&
(ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
return 0;
}
* anyway.
*/
- ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
- F_SETLKW, 0, 1);
+ if (mark_lock) {
+ ret = 0;
+ } else {
+ ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
+ F_SETLKW, 0, 1);
+ }
tdb->num_locks--;
/*
}
if (ret)
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
+ TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
+ return ret;
+}
+
+/*
+ get the transaction lock
+ */
+int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
+{
+ if (tdb->have_transaction_lock || tdb->global_lock.count) {
+ return 0;
+ }
+ if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
+ F_SETLKW, 0, 1) == -1) {
+ TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
+ tdb->ecode = TDB_ERR_LOCK;
+ return -1;
+ }
+ tdb->have_transaction_lock = 1;
+ return 0;
+}
+
+/*
+ release the transaction lock
+ */
+int tdb_transaction_unlock(struct tdb_context *tdb)
+{
+ int ret;
+ if (!tdb->have_transaction_lock) {
+ return 0;
+ }
+ ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ if (ret == 0) {
+ tdb->have_transaction_lock = 0;
+ }
return ret;
}
+
/* lock/unlock entire database */
-static int _tdb_lockall(struct tdb_context *tdb, int ltype)
+static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
{
+ bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
+
+ ltype &= ~TDB_MARK_LOCK;
+
/* There are no locks on read-only dbs */
if (tdb->read_only || tdb->traverse_read)
return TDB_ERRCODE(TDB_ERR_LOCK, -1);
/* a global lock of a different type exists */
return TDB_ERRCODE(TDB_ERR_LOCK, -1);
}
-
+
if (tdb->num_locks != 0) {
/* can't combine global and chain locks */
return TDB_ERRCODE(TDB_ERR_LOCK, -1);
}
- if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, F_SETLKW,
+ if (!mark_lock &&
+ tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
0, 4*tdb->header.hash_size)) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
+ if (op == F_SETLKW) {
+ TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
+ }
return -1;
}
return 0;
}
+
+
/* unlock entire db */
static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
{
+ bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
+
+ ltype &= ~TDB_MARK_LOCK;
+
/* There are no locks on read-only dbs */
if (tdb->read_only || tdb->traverse_read) {
return TDB_ERRCODE(TDB_ERR_LOCK, -1);
return 0;
}
- if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
+ if (!mark_lock &&
+ tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
0, 4*tdb->header.hash_size)) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
return -1;
/* lock entire database with write lock */
int tdb_lockall(struct tdb_context *tdb)
{
- return _tdb_lockall(tdb, F_WRLCK);
+ return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
+}
+
+/* lock entire database with write lock - mark only */
+int tdb_lockall_mark(struct tdb_context *tdb)
+{
+ return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
+}
+
+/* unlock entire database with write lock - unmark only */
+int tdb_lockall_unmark(struct tdb_context *tdb)
+{
+ return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
+}
+
+/* lock entire database with write lock - nonblocking varient */
+int tdb_lockall_nonblock(struct tdb_context *tdb)
+{
+ return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
}
/* unlock entire database with write lock */
/* lock entire database with read lock */
int tdb_lockall_read(struct tdb_context *tdb)
{
- return _tdb_lockall(tdb, F_RDLCK);
+ return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
+}
+
+/* lock entire database with read lock - nonblock varient */
+int tdb_lockall_read_nonblock(struct tdb_context *tdb)
+{
+ return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
}
/* unlock entire database with read lock */
return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
}
+/* lock/unlock one hash chain, non-blocking. This is meant to be used
+ to reduce contention - it cannot guarantee how many records will be
+ locked */
+int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
+{
+ return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
+}
+
+/* mark a chain as locked without actually locking it. Warning! use with great caution! */
+int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
+{
+ return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
+}
+
+/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
+int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
+{
+ return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
+}
+
int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
{
return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
/* check for an out of bounds access - if it is out of bounds then
see if the database has been expanded by someone else and expand
- if necessary
+ if necessary
note that "len" is the minimum length needed for the db
*/
static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
}
/* write a lump of data at a specified offset */
-static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
+static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
const void *buf, tdb_len_t len)
{
if (len == 0) {
/* read a lump of data at a specified offset, maybe convert */
-static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
+static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
tdb_len_t len, int cv)
{
if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
/*
do an unlocked scan of the hash table heads to find the next non-zero head. The value
will then be confirmed with the lock held
-*/
+*/
static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
{
u32 h = *chain;
#ifdef HAVE_MMAP
if (tdb->map_ptr) {
- int ret = munmap(tdb->map_ptr, tdb->map_size);
+ int ret = munmap(tdb->map_ptr, tdb->real_map_size);
if (ret != 0)
return ret;
+ tdb->real_map_size = 0;
}
#endif
tdb->map_ptr = NULL;
#ifdef HAVE_MMAP
if (!(tdb->flags & TDB_NOMMAP)) {
- tdb->map_ptr = mmap(NULL, tdb->map_size,
- PROT_READ|(tdb->read_only? 0:PROT_WRITE),
+ tdb->map_ptr = mmap(NULL, tdb->map_size,
+ PROT_READ|(tdb->read_only? 0:PROT_WRITE),
MAP_SHARED|MAP_FILE, tdb->fd, 0);
/*
*/
if (tdb->map_ptr == MAP_FAILED) {
+ tdb->real_map_size = 0;
tdb->map_ptr = NULL;
- TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
+ TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
tdb->map_size, strerror(errno)));
}
+ tdb->real_map_size = tdb->map_size;
} else {
tdb->map_ptr = NULL;
}
if (ftruncate(tdb->fd, size+addition) == -1) {
char b = 0;
if (pwrite(tdb->fd, &b, 1, (size+addition) - 1) != 1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
size+addition, strerror(errno)));
return -1;
}
int n = addition>sizeof(buf)?sizeof(buf):addition;
int ret = pwrite(tdb->fd, buf, n, size);
if (ret != n) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
n, strerror(errno)));
return -1;
}
- allow for nested calls to tdb_transaction_start(), re-using the
existing transaction record. If the inner transaction is cancelled
then a subsequent commit will fail
-
+
- keep a mirrored copy of the tdb hash chain heads to allow for the
fast hash heads scan on traverse, updating the mirrored copy in
the transaction version of tdb_write
read while in a transaction. We need to check first if the data is in our list
of transaction elements, then if not do a real read
*/
-static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
+static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
tdb_len_t len, int cv)
{
struct tdb_transaction_el *el;
len -= partial;
off += partial;
buf = (void *)(partial + (char *)buf);
-
+
if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
goto fail;
}
/*
write while in a transaction
*/
-static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
+static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
const void *buf, tdb_len_t len)
{
struct tdb_transaction_el *el, *best_el=NULL;
if (len == 0) {
return 0;
}
-
+
/* if the write is to a hash head, then update the transaction
hash heads */
if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
len -= partial;
off += partial;
buf = (const void *)(partial + (const char *)buf);
-
+
if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
goto fail;
}
}
/* see if we can append the new entry to an existing entry */
- if (best_el && best_el->offset + best_el->length == off &&
+ if (best_el && best_el->offset + best_el->length == off &&
(off+len < tdb->transaction->old_map_size ||
off > tdb->transaction->old_map_size)) {
unsigned char *data = best_el->data;
el = (struct tdb_transaction_el *)malloc(sizeof(*el));
if (el == NULL) {
tdb->ecode = TDB_ERR_OOM;
- tdb->transaction->transaction_error = 1;
+ tdb->transaction->transaction_error = 1;
return -1;
}
el->next = NULL;
if (el->data == NULL) {
free(el);
tdb->ecode = TDB_ERR_OOM;
- tdb->transaction->transaction_error = 1;
+ tdb->transaction->transaction_error = 1;
return -1;
}
if (buf) {
/*
transaction version of tdb_expand().
*/
-static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
+static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
tdb_off_t addition)
{
/* add a write to the transaction elements, so subsequent
/* cope with nested tdb_transaction_start() calls */
if (tdb->transaction != NULL) {
tdb->transaction->nesting++;
- TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
+ TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
tdb->transaction->nesting));
return 0;
}
/* get the transaction write lock. This is a blocking lock. As
discussed with Volker, there are a number of ways we could
make this async, which we will probably do in the future */
- if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
- tdb->ecode = TDB_ERR_LOCK;
+ if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
SAFE_FREE(tdb->transaction);
return -1;
}
-
+
/* get a read lock from the freelist to the end of file. This
is upgraded to a write lock during the commit */
if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
/* by calling this transaction write here, we ensure that we don't grow the
transaction linked list due to hash table updates */
- if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
+ if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
TDB_HASHTABLE_SIZE(tdb)) != 0) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
tdb->ecode = TDB_ERR_IO;
+ tdb->methods = tdb->transaction->io_methods;
goto fail;
}
return 0;
-
+
fail:
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
- tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ tdb_transaction_unlock(tdb);
SAFE_FREE(tdb->transaction->hash_heads);
SAFE_FREE(tdb->transaction);
return -1;
cancel the current transaction
*/
int tdb_transaction_cancel(struct tdb_context *tdb)
-{
+{
if (tdb->transaction == NULL) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
return -1;
tdb->transaction->transaction_error = 1;
tdb->transaction->nesting--;
return 0;
- }
+ }
tdb->map_size = tdb->transaction->old_map_size;
F_UNLCK,F_SETLKW, 0, 1);
}
tdb->num_locks = 0;
+ tdb->num_lockrecs = 0;
+ SAFE_FREE(tdb->lockrecs);
}
/* restore the normal io methods */
tdb->methods = tdb->transaction->io_methods;
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
- tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ tdb_transaction_unlock(tdb);
SAFE_FREE(tdb->transaction->hash_heads);
SAFE_FREE(tdb->transaction);
-
+
return 0;
}
sync to disk
*/
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
-{
+{
if (fsync(tdb->fd) != 0) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
return -1;
}
-#ifdef MS_SYNC
+#if defined(HAVE_MSYNC) && defined(MS_SYNC)
if (tdb->map_ptr) {
tdb_off_t moffset = offset & ~(tdb->page_size-1);
- if (msync(moffset + (char *)tdb->map_ptr,
+ if (msync(moffset + (char *)tdb->map_ptr,
length + (offset - moffset), MS_SYNC) != 0) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
allocate the recovery area, or use an existing recovery area if it is
large enough
*/
-static int tdb_recovery_allocate(struct tdb_context *tdb,
+static int tdb_recovery_allocate(struct tdb_context *tdb,
tdb_len_t *recovery_size,
tdb_off_t *recovery_offset,
tdb_len_t *recovery_max_size)
rec.rec_len = 0;
- if (recovery_head != 0 &&
+ if (recovery_head != 0 &&
methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
return -1;
*recovery_offset = tdb->map_size;
recovery_head = *recovery_offset;
- if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+ if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
(tdb->map_size - tdb->transaction->old_map_size) +
sizeof(rec) + *recovery_max_size) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
/* write the recovery header offset and sync - we can sync without a race here
as the magic ptr in the recovery record has not been set */
CONVERT(recovery_head);
- if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
+ if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
&recovery_head, sizeof(tdb_off_t)) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
return -1;
/*
setup the recovery data that will be used on a crash during commit
*/
-static int transaction_setup_recovery(struct tdb_context *tdb,
+static int transaction_setup_recovery(struct tdb_context *tdb,
tdb_off_t *magic_offset)
{
struct tdb_transaction_el *el;
/*
check that the recovery area has enough space
*/
- if (tdb_recovery_allocate(tdb, &recovery_size,
+ if (tdb_recovery_allocate(tdb, &recovery_size,
&recovery_offset, &recovery_max_size) == -1) {
return -1;
}
commit the current transaction
*/
int tdb_transaction_commit(struct tdb_context *tdb)
-{
+{
const struct tdb_methods *methods;
tdb_off_t magic_offset = 0;
u32 zero = 0;
if (tdb->transaction->nesting != 0) {
tdb->transaction->nesting--;
return 0;
- }
+ }
/* check for a null transaction */
if (tdb->transaction->elements == NULL) {
}
methods = tdb->transaction->io_methods;
-
+
/* if there are any locks pending then the caller has not
nested their locks properly, so fail the transaction */
if (tdb->num_locks || tdb->global_lock.count) {
/* expand the file to the new size if needed */
if (tdb->map_size != tdb->transaction->old_map_size) {
- if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
- tdb->map_size -
+ if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+ tdb->map_size -
tdb->transaction->old_map_size) == -1) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
-
+
/* we've overwritten part of the data and
possibly expanded the file, so we need to
run the crash recovery code */
tdb->methods = methods;
- tdb_transaction_recover(tdb);
+ tdb_transaction_recover(tdb);
tdb_transaction_cancel(tdb);
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
return -1;
}
tdb->transaction->elements = el->next;
- free(el->data);
+ free(el->data);
free(el);
- }
+ }
if (!(tdb->flags & TDB_NOSYNC)) {
/* ensure the new data is on disk */
}
/* read the recovery record */
- if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
+ if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
sizeof(rec), DOCONV()) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
tdb->ecode = TDB_ERR_IO;
return -1;
}
data = (unsigned char *)malloc(rec.data_len);
if (data == NULL) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
tdb->ecode = TDB_ERR_OOM;
return -1;
}
/* read the full recovery data */
if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
rec.data_len, 0) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
tdb->ecode = TDB_ERR_IO;
return -1;
}
if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
tdb->ecode = TDB_ERR_IO;
- return -1;
+ return -1;
}
}
/* remove the recovery magic */
- if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
+ if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
&zero) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
tdb->ecode = TDB_ERR_IO;
- return -1;
+ return -1;
}
-
+
/* reduce the file size to the old size */
tdb_munmap(tdb);
if (ftruncate(tdb->fd, recovery_eof) != 0) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
tdb->ecode = TDB_ERR_IO;
- return -1;
+ return -1;
}
tdb->map_size = recovery_eof;
tdb_mmap(tdb);
return -1;
}
- TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
+ TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
recovery_eof));
/* all done */
/* file: freelist.c */
/* read a freelist record and check for simple errors */
-static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
+static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
{
if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
return -1;
if (rec->magic == TDB_MAGIC) {
/* this happens when a app is showdown while deleting a record - we should
not completely fail when this happens */
- TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
+ TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
rec->magic, off));
rec->magic = TDB_FREE_MAGIC;
if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
if (rec->magic != TDB_FREE_MAGIC) {
/* Ensure ecode is set for log fn. */
tdb->ecode = TDB_ERR_CORRUPT;
- TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read bad magic 0x%x at offset=%d\n",
+ TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
rec->magic, off));
return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
}
if (left > TDB_DATA_START(tdb->header.hash_size)) {
struct list_struct l;
tdb_off_t leftsize;
-
+
/* Read in tailer and jump back to header */
if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
}
-/*
+/*
the core of tdb_allocate - called when we have decided which
free list entry to use
*/
if (rec->rec_len > length + MIN_REC_SIZE) {
/* Length of left piece */
length = TDB_ALIGN(length, TDB_ALIGNMENT);
-
+
/* Right piece to go on free list */
newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
newrec_ptr = rec_ptr + sizeof(*rec) + length;
-
+
/* And left record is shortened */
rec->rec_len = length;
} else {
newrec_ptr = 0;
}
-
+
/* Remove allocated record from the free list */
if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
return 0;
}
-
+
/* Update header: do this before we drop alloc
lock, otherwise tdb_free() might try to
merge with us, thinking we're free.
if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
return 0;
}
-
+
/* Did we create new block? */
if (newrec_ptr) {
/* Update allocated record tailer (we
if (update_tailer(tdb, rec_ptr, rec) == -1) {
return 0;
}
-
+
/* Free new record */
if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
return 0;
}
}
-
+
/* all done - return the new record offset */
return rec_ptr;
}
bestfit.last_ptr = 0;
bestfit.rec_len = 0;
- /*
+ /*
this is a best fit allocation strategy. Originally we used
a first fit strategy, but it suffered from massive fragmentation
issues when faced with a slowly increasing record size.
*/
while (rec_ptr) {
- if (rec_free_read(tdb, rec_ptr, rec) == -1) {
+ if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
goto fail;
}
}
if (bestfit.rec_ptr != 0) {
- if (rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
+ if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
goto fail;
}
goto fail;
}
- if (rec_free_read(tdb, rec_ptr, &rec) == -1) {
+ if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
goto fail;
}
common for the use of tdb with ldb, where large
hashes are used. In that case we spend most of our
time in tdb_brlock(), locking empty hash chains.
-
+
To avoid this, we do an unlocked pre-check to see
if the hash chain is empty before starting to look
inside it. If it is empty then we can avoid that
the value we get back, as we read it without a
lock, so instead we get the lock and re-fetch the
value below.
-
+
Notice that not doing this optimisation on the
first hash chain is critical. We must guarantee
that we have done at least one fcntl lock at the
could possibly miss those with this trick, but we
could miss them anyway without this trick, so the
semantics don't change.
-
+
With a non-indexed ldb search this trick gains us a
factor of around 80 in speed on a linux 2.6.x
system (testing using ldbtest).
/* Try to clean dead ones from old traverses */
current = tlock->off;
tlock->off = rec->next;
- if (!(tdb->read_only || tdb->traverse_read) &&
+ if (!(tdb->read_only || tdb->traverse_read) &&
tdb_do_delete(tdb, current, rec) != 0)
goto fail;
}
if fn is NULL then it is not called
a non-zero return value from fn() indicates that the traversal should stop
*/
-static int tdb_traverse_internal(struct tdb_context *tdb,
+static int tdb_traverse_internal(struct tdb_context *tdb,
tdb_traverse_func fn, void *private_data,
struct tdb_traverse_lock *tl)
{
while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
count++;
/* now read the full record */
- key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
+ key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
rec.key_len + rec.data_len);
if (!key.dptr) {
ret = -1;
/*
a write style traverse - temporarily marks the db read only
*/
-int tdb_traverse_read(struct tdb_context *tdb,
+int tdb_traverse_read(struct tdb_context *tdb,
tdb_traverse_func fn, void *private_data)
{
struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
int ret;
-
+
/* we need to get a read lock on the transaction lock here to
cope with the lock ordering semantics of solaris10 */
- if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse_read: failed to get transaction lock\n"));
- tdb->ecode = TDB_ERR_LOCK;
+ if (tdb_transaction_lock(tdb, F_RDLCK)) {
return -1;
}
ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
tdb->traverse_read--;
- tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ tdb_transaction_unlock(tdb);
return ret;
}
a write style traverse - needs to get the transaction lock to
prevent deadlocks
*/
-int tdb_traverse(struct tdb_context *tdb,
+int tdb_traverse(struct tdb_context *tdb,
tdb_traverse_func fn, void *private_data)
{
struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
if (tdb->read_only || tdb->traverse_read) {
return tdb_traverse_read(tdb, fn, private_data);
}
-
- if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse: failed to get transaction lock\n"));
- tdb->ecode = TDB_ERR_LOCK;
+
+ if (tdb_transaction_lock(tdb, F_WRLCK)) {
return -1;
}
ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
- tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ tdb_transaction_unlock(tdb);
return ret;
}
tdb->travlocks.off = tdb->travlocks.hash = 0;
tdb->travlocks.lock_rw = F_RDLCK;
+ /* Grab first record: locks chain and returned record. */
if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
return tdb_null;
/* now read the key */
key.dsize = rec.key_len;
key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
- if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
+
+ /* Unlock the hash chain of the record we just read. */
+ if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
return key;
}
/* Is locked key the old key? If so, traverse will be reliable. */
if (tdb->travlocks.off) {
- if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
+ if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
return tdb_null;
if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
|| !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
SAFE_FREE(k);
return tdb_null;
}
- if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0) {
+ if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
SAFE_FREE(k);
return tdb_null;
}
if (!tdb->travlocks.off) {
/* No previous element: do normal find, and lock record */
- tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
+ tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
if (!tdb->travlocks.off)
return tdb_null;
tdb->travlocks.hash = BUCKET(rec.full_hash);
key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
key.dsize);
/* Unlock the chain of this new record */
- if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
+ if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
}
/* Unlock the chain of old record */
- if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
+ if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
return key;
}
/* file: dump.c */
-static tdb_off_t tdb_dump_record(struct tdb_context *tdb, tdb_off_t offset)
+static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
+ tdb_off_t offset)
{
struct list_struct rec;
tdb_off_t tailer_ofs, tailer;
- if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
+ if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
sizeof(rec), DOCONV()) == -1) {
printf("ERROR: failed to read record at %u\n", offset);
return 0;
}
- printf(" rec: offset=0x%08x next=0x%08x rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
- offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
+ printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
+ "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
+ hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
+ rec.full_hash, rec.magic);
tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
printf("hash=%d\n", i);
while (rec_ptr) {
- rec_ptr = tdb_dump_record(tdb, rec_ptr);
+ rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
}
return tdb_unlock(tdb, i, F_WRLCK);
printf("freelist top=[0x%08x]\n", rec_ptr );
while (rec_ptr) {
- if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
+ if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
sizeof(rec), DOCONV()) == -1) {
tdb_unlock(tdb, -1, F_WRLCK);
return -1;
return -1;
}
- printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
+ printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
total_free += rec.rec_len;
/* move to the next record */
rec_ptr = rec.next;
}
- printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
+ printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
(int)total_free);
return tdb_unlock(tdb, -1, F_WRLCK);
/* file: tdb.c */
-TDB_DATA tdb_null;
-
/*
- increment the tdb sequence number if the tdb has been opened using
+ non-blocking increment of the tdb sequence number if the tdb has been opened using
the TDB_SEQNUM flag
*/
-static void tdb_increment_seqnum(struct tdb_context *tdb)
+void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
{
tdb_off_t seqnum=0;
-
- if (!(tdb->flags & TDB_SEQNUM)) {
- return;
- }
- if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
+ if (!(tdb->flags & TDB_SEQNUM)) {
return;
}
tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
seqnum++;
tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
+}
+
+/*
+ increment the tdb sequence number if the tdb has been opened using
+ the TDB_SEQNUM flag
+*/
+static void tdb_increment_seqnum(struct tdb_context *tdb)
+{
+ if (!(tdb->flags & TDB_SEQNUM)) {
+ return;
+ }
+
+ if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
+ return;
+ }
+
+ tdb_increment_seqnum_nonblock(tdb);
tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
}
struct list_struct *r)
{
tdb_off_t rec_ptr;
-
+
/* read in the hash top */
if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
return 0;
rec.data_len = dbuf.dsize;
return tdb_rec_write(tdb, rec_ptr, &rec);
}
-
+
return 0;
}
return ret;
}
-/* check if an entry in the database exists
+/* check if an entry in the database exists
note that 1 is returned if the key is found and 0 is returned if not found
this doesn't match the conventions in the rest of this module, but is
static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
{
struct list_struct rec;
-
+
if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
return 0;
tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
int res = 0;
tdb_off_t rec_ptr;
struct list_struct rec;
-
+
/* read in the hash top */
if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
return 0;
if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
return -1;
}
-
+
/* read in the hash top */
if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
goto fail;
struct list_struct *r, tdb_len_t length)
{
tdb_off_t rec_ptr;
-
+
/* read in the hash top */
if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
return 0;
}
/* store an element in the database, replacing any existing element
- with the same key
+ with the same key
return 0 on success, -1 on failure
*/
tdb_increment_seqnum(tdb);
}
- SAFE_FREE(p);
+ SAFE_FREE(p);
tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
return ret;
}
if (dbuf.dptr == NULL) {
dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
} else {
- dbuf.dptr = (unsigned char *)realloc(dbuf.dptr,
+ unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
dbuf.dsize + new_dbuf.dsize);
+ if (new_dptr == NULL) {
+ free(dbuf.dptr);
+ }
+ dbuf.dptr = new_dptr;
}
if (dbuf.dptr == NULL) {
dbuf.dsize += new_dbuf.dsize;
ret = tdb_store(tdb, key, dbuf, 0);
-
+
failed:
tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
SAFE_FREE(dbuf.dptr);
return tdb->flags;
}
+
+/*
+ enable sequence number handling on an open tdb
+*/
+void tdb_enable_seqnum(struct tdb_context *tdb)
+{
+ tdb->flags |= TDB_SEQNUM;
+}
+
/* file: open.c */
/* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
static struct tdb_context *tdbs = NULL;
-/* This is based on the hash algorithm from gdbm */
+/* This is from a hash algorithm suggested by Rogier Wolff */
static unsigned int default_tdb_hash(TDB_DATA *key)
{
u32 value; /* Used to compute the hash value. */
u32 i; /* Used to cycle through random values. */
/* Set the initial value from the key size. */
- for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
- value = (value + (key->dptr[i] << (i*5 % 24)));
+ for (value = 0, i=0; i < key->dsize; i++)
+ value = value * 256 + key->dptr[i] + (value >> 24) * 241;
- return (1103515243 * value + 12345);
+ return value;
}
ino_t ino)
{
struct tdb_context *i;
-
+
for (i = tdbs; i; i = i->next) {
if (i->device == device && i->inode == ino) {
return 1;
return 0;
}
-/* open the database, creating it if necessary
+/* open the database, creating it if necessary
The open_flags and mode are passed straight to the open call on the
database file. A flags value of O_WRONLY is invalid. The hash size
is advisory, use zero for a default value.
- Return is NULL on error, in which case errno is also set. Don't
+ Return is NULL on error, in which case errno is also set. Don't
try to call tdb_error or tdb_errname, just do strerror(errno).
@param name may be NULL for internal databases. */
tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
/* cache the page size */
- tdb->page_size = getpagesize();
+ tdb->page_size = sysconf(_SC_PAGESIZE);
if (tdb->page_size <= 0) {
tdb->page_size = 0x2000;
}
errno = EINVAL;
goto fail;
}
-
+
if (hash_size == 0)
hash_size = DEFAULT_HASH_SIZE;
if ((open_flags & O_ACCMODE) == O_RDONLY) {
if (!tdb)
return NULL;
-
+
if (tdb->map_ptr) {
if (tdb->flags & TDB_INTERNAL)
SAFE_FREE(tdb->map_ptr);
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
goto fail;
}
- if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
+ if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
(tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
goto fail;
return 0;
}
+
+/**
+ * Flush a database file from the page cache.
+ **/
+int tdb_flush(struct tdb_context *tdb)
+{
+ if (tdb->fd != -1)
+ return fsync(tdb->fd);
+ return 0;
+}