2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
7 trivial database library - standalone version
9 Copyright (C) Andrew Tridgell 1999-2005
10 Copyright (C) Jeremy Allison 2000-2006
11 Copyright (C) Paul `Rusty' Russell 2000
13 ** NOTE! The following LGPL license applies to the tdb
14 ** library. This does NOT imply that all of Samba is released
17 This library is free software; you can redistribute it and/or
18 modify it under the terms of the GNU Lesser General Public
19 License as published by the Free Software Foundation; either
20 version 2 of the License, or (at your option) any later version.
22 This library is distributed in the hope that it will be useful,
23 but WITHOUT ANY WARRANTY; without even the implied warranty of
24 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
25 Lesser General Public License for more details.
27 You should have received a copy of the GNU Lesser General Public
28 License along with this library; if not, write to the Free Software
29 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
32 #ifdef CONFIG_STAND_ALONE
35 #define HAVE_SYS_MMAN_H
39 #define _XOPEN_SOURCE 600
49 #ifdef HAVE_SYS_SELECT_H
50 #include <sys/select.h>
53 #include <sys/types.h>
62 #ifdef HAVE_SYS_MMAN_H
71 #define MAP_FAILED ((void *)-1)
75 #define strdup rep_strdup
76 static char *rep_strdup(const char *s)
86 ret = malloc(length + 1);
88 strncpy(ret, s, length);
95 #ifndef PRINTF_ATTRIBUTE
96 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
97 /** Use gcc attribute to check printf fns. a1 is the 1-based index of
98 * the parameter containing the format, and a2 the index of the first
99 * argument. Note that some gcc 2.x versions don't handle this
101 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
103 #define PRINTF_ATTRIBUTE(a1, a2)
111 static TDB_DATA tdb_null;
117 typedef u32 tdb_len_t;
118 typedef u32 tdb_off_t;
121 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
124 #define TDB_MAGIC_FOOD "TDB file\n"
125 #define TDB_VERSION (0x26011967 + 6)
126 #define TDB_MAGIC (0x26011999U)
127 #define TDB_FREE_MAGIC (~TDB_MAGIC)
128 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
129 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
130 #define TDB_ALIGNMENT 4
131 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
132 #define DEFAULT_HASH_SIZE 131
133 #define FREELIST_TOP (sizeof(struct tdb_header))
134 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
135 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
136 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
137 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
138 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
139 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
140 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
141 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
142 #define TDB_SEQNUM_OFS offsetof(struct tdb_header, sequence_number)
143 #define TDB_PAD_BYTE 0x42
144 #define TDB_PAD_U32 0x42424242
146 /* NB assumes there is a local variable called "tdb" that is the
147 * current context, also takes doubly-parenthesized print-style
149 #define TDB_LOG(x) tdb->log.log_fn x
152 #define GLOBAL_LOCK 0
153 #define ACTIVE_LOCK 4
154 #define TRANSACTION_LOCK 8
156 /* free memory if the pointer is valid and zero the pointer */
158 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
161 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
163 #define DOCONV() (tdb->flags & TDB_CONVERT)
164 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
167 /* the body of the database is made of one list_struct for the free space
168 plus a separate data list for each hash value */
170 tdb_off_t next; /* offset of the next record in the list */
171 tdb_len_t rec_len; /* total byte length of record */
172 tdb_len_t key_len; /* byte length of key */
173 tdb_len_t data_len; /* byte length of data */
174 u32 full_hash; /* the full 32 bit hash of the key */
175 u32 magic; /* try to catch errors */
176 /* the following union is implied:
178 char record[rec_len];
183 u32 totalsize; (tailer)
189 /* this is stored at the front of every database */
191 char magic_food[32]; /* for /etc/magic */
192 u32 version; /* version of the code */
193 u32 hash_size; /* number of hash entries */
194 tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
195 tdb_off_t recovery_start; /* offset of transaction recovery region */
196 tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
197 tdb_off_t reserved[29];
200 struct tdb_lock_type {
206 struct tdb_traverse_lock {
207 struct tdb_traverse_lock *next;
215 int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
216 int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
217 void (*next_hash_chain)(struct tdb_context *, u32 *);
218 int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
219 int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
220 int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
224 char *name; /* the name of the database */
225 void *map_ptr; /* where it is currently mapped */
226 int fd; /* open file descriptor for the database */
227 tdb_len_t map_size; /* how much space has been mapped */
228 int read_only; /* opened read-only */
229 int traverse_read; /* read-only traversal */
230 struct tdb_lock_type global_lock;
232 struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
233 enum TDB_ERROR ecode; /* error code for last tdb error */
234 struct tdb_header header; /* a cached copy of the header */
235 u32 flags; /* the flags passed to tdb_open */
236 struct tdb_traverse_lock travlocks; /* current traversal locks */
237 struct tdb_context *next; /* all tdbs to avoid multiple opens */
238 dev_t device; /* uniquely identifies this tdb */
239 ino_t inode; /* uniquely identifies this tdb */
240 struct tdb_logging_context log;
241 unsigned int (*hash_fn)(TDB_DATA *key);
242 int open_flags; /* flags used in the open - needed by reopen */
243 unsigned int num_locks; /* number of chain locks held */
244 const struct tdb_methods *methods;
245 struct tdb_transaction *transaction;
247 int max_dead_records;
248 bool have_transaction_lock;
255 static int tdb_munmap(struct tdb_context *tdb);
256 static void tdb_mmap(struct tdb_context *tdb);
257 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
258 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
259 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
260 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
261 static int tdb_transaction_unlock(struct tdb_context *tdb);
262 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
263 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
264 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
265 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
266 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267 static void *tdb_convert(void *buf, u32 size);
268 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
269 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
270 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
271 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
272 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
273 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
274 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
275 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
276 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
277 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
278 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
279 tdb_off_t offset, tdb_len_t len,
280 int (*parser)(TDB_DATA key, TDB_DATA data,
283 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
284 struct list_struct *rec);
285 static void tdb_io_init(struct tdb_context *tdb);
286 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
287 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
288 struct list_struct *rec);
293 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
298 static struct tdb_errname {
299 enum TDB_ERROR ecode; const char *estring;
300 } emap[] = { {TDB_SUCCESS, "Success"},
301 {TDB_ERR_CORRUPT, "Corrupt database"},
302 {TDB_ERR_IO, "IO Error"},
303 {TDB_ERR_LOCK, "Locking error"},
304 {TDB_ERR_OOM, "Out of memory"},
305 {TDB_ERR_EXISTS, "Record exists"},
306 {TDB_ERR_NOLOCK, "Lock exists on other keys"},
307 {TDB_ERR_EINVAL, "Invalid parameter"},
308 {TDB_ERR_NOEXIST, "Record does not exist"},
309 {TDB_ERR_RDONLY, "write not permitted"} };
311 /* Error string for the last tdb error */
312 const char *tdb_errorstr(struct tdb_context *tdb)
315 for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
316 if (tdb->ecode == emap[i].ecode)
317 return emap[i].estring;
318 return "Invalid error code";
323 #define TDB_MARK_LOCK 0x80000000
325 /* a byte range locking function - return 0 on success
326 this functions locks/unlocks 1 byte at the specified offset.
328 On error, errno is also set so that errors are passed back properly
331 note that a len of zero means lock to end of file
333 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
334 int rw_type, int lck_type, int probe, size_t len)
339 if (tdb->flags & TDB_NOLOCK) {
343 if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
344 tdb->ecode = TDB_ERR_RDONLY;
349 fl.l_whence = SEEK_SET;
355 ret = fcntl(tdb->fd,lck_type,&fl);
356 } while (ret == -1 && errno == EINTR);
359 /* Generic lock error. errno set by fcntl.
360 * EAGAIN is an expected return from non-blocking
362 if (!probe && lck_type != F_SETLK) {
363 /* Ensure error code is set for log fun to examine. */
364 tdb->ecode = TDB_ERR_LOCK;
365 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
366 tdb->fd, offset, rw_type, lck_type, (int)len));
368 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
375 upgrade a read lock to a write lock. This needs to be handled in a
376 special way as some OSes (such as solaris) have too conservative
377 deadlock detection and claim a deadlock when progress can be
378 made. For those OSes we may loop for a while.
380 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
385 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
388 if (errno != EDEADLK) {
391 /* sleep for as short a time as we can - more portable than usleep() */
394 select(0, NULL, NULL, NULL, &tv);
396 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
401 /* lock a list in the database. list -1 is the alloc list */
402 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
404 struct tdb_lock_type *new_lck;
406 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
408 ltype &= ~TDB_MARK_LOCK;
410 /* a global lock allows us to avoid per chain locks */
411 if (tdb->global_lock.count &&
412 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
416 if (tdb->global_lock.count) {
417 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
420 if (list < -1 || list >= (int)tdb->header.hash_size) {
421 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
425 if (tdb->flags & TDB_NOLOCK)
428 for (i=0; i<tdb->num_lockrecs; i++) {
429 if (tdb->lockrecs[i].list == list) {
430 if (tdb->lockrecs[i].count == 0) {
432 * Can't happen, see tdb_unlock(). It should
435 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
436 "lck->count == 0 for list %d", list));
439 * Just increment the in-memory struct, posix locks
442 tdb->lockrecs[i].count++;
447 new_lck = (struct tdb_lock_type *)realloc(
449 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
450 if (new_lck == NULL) {
454 tdb->lockrecs = new_lck;
456 /* Since fcntl locks don't nest, we do a lock for the first one,
457 and simply bump the count for future ones */
459 tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
466 tdb->lockrecs[tdb->num_lockrecs].list = list;
467 tdb->lockrecs[tdb->num_lockrecs].count = 1;
468 tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
469 tdb->num_lockrecs += 1;
474 /* lock a list in the database. list -1 is the alloc list */
475 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
478 ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
480 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
481 "ltype=%d (%s)\n", list, ltype, strerror(errno)));
486 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
487 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
489 return _tdb_lock(tdb, list, ltype, F_SETLK);
493 /* unlock the database: returns void because it's too late for errors. */
494 /* changed to return int it may be interesting to know there
495 has been an error --simo */
496 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
500 struct tdb_lock_type *lck = NULL;
501 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
503 ltype &= ~TDB_MARK_LOCK;
505 /* a global lock allows us to avoid per chain locks */
506 if (tdb->global_lock.count &&
507 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
511 if (tdb->global_lock.count) {
512 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
515 if (tdb->flags & TDB_NOLOCK)
519 if (list < -1 || list >= (int)tdb->header.hash_size) {
520 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
524 for (i=0; i<tdb->num_lockrecs; i++) {
525 if (tdb->lockrecs[i].list == list) {
526 lck = &tdb->lockrecs[i];
531 if ((lck == NULL) || (lck->count == 0)) {
532 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
536 if (lck->count > 1) {
542 * This lock has count==1 left, so we need to unlock it in the
543 * kernel. We don't bother with decrementing the in-memory array
544 * element, we're about to overwrite it with the last array element
551 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
557 * Shrink the array by overwriting the element just unlocked with the
558 * last array element.
561 if (tdb->num_lockrecs > 1) {
562 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
564 tdb->num_lockrecs -= 1;
567 * We don't bother with realloc when the array shrinks, but if we have
568 * a completely idle tdb we should get rid of the locked array.
571 if (tdb->num_lockrecs == 0) {
572 SAFE_FREE(tdb->lockrecs);
576 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
581 get the transaction lock
583 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
585 if (tdb->have_transaction_lock || tdb->global_lock.count) {
588 if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
589 F_SETLKW, 0, 1) == -1) {
590 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
591 tdb->ecode = TDB_ERR_LOCK;
594 tdb->have_transaction_lock = 1;
599 release the transaction lock
601 int tdb_transaction_unlock(struct tdb_context *tdb)
604 if (!tdb->have_transaction_lock) {
607 ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
609 tdb->have_transaction_lock = 0;
617 /* lock/unlock entire database */
618 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
620 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
622 ltype &= ~TDB_MARK_LOCK;
624 /* There are no locks on read-only dbs */
625 if (tdb->read_only || tdb->traverse_read)
626 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
628 if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
629 tdb->global_lock.count++;
633 if (tdb->global_lock.count) {
634 /* a global lock of a different type exists */
635 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
638 if (tdb->num_locks != 0) {
639 /* can't combine global and chain locks */
640 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
644 tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
645 0, 4*tdb->header.hash_size)) {
646 if (op == F_SETLKW) {
647 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
652 tdb->global_lock.count = 1;
653 tdb->global_lock.ltype = ltype;
660 /* unlock entire db */
661 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
663 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
665 ltype &= ~TDB_MARK_LOCK;
667 /* There are no locks on read-only dbs */
668 if (tdb->read_only || tdb->traverse_read) {
669 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
672 if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
673 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
676 if (tdb->global_lock.count > 1) {
677 tdb->global_lock.count--;
682 tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
683 0, 4*tdb->header.hash_size)) {
684 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
688 tdb->global_lock.count = 0;
689 tdb->global_lock.ltype = 0;
694 /* lock entire database with write lock */
695 int tdb_lockall(struct tdb_context *tdb)
697 return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
700 /* lock entire database with write lock - mark only */
701 int tdb_lockall_mark(struct tdb_context *tdb)
703 return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
706 /* unlock entire database with write lock - unmark only */
707 int tdb_lockall_unmark(struct tdb_context *tdb)
709 return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
712 /* lock entire database with write lock - nonblocking varient */
713 int tdb_lockall_nonblock(struct tdb_context *tdb)
715 return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
718 /* unlock entire database with write lock */
719 int tdb_unlockall(struct tdb_context *tdb)
721 return _tdb_unlockall(tdb, F_WRLCK);
724 /* lock entire database with read lock */
725 int tdb_lockall_read(struct tdb_context *tdb)
727 return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
730 /* lock entire database with read lock - nonblock varient */
731 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
733 return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
736 /* unlock entire database with read lock */
737 int tdb_unlockall_read(struct tdb_context *tdb)
739 return _tdb_unlockall(tdb, F_RDLCK);
742 /* lock/unlock one hash chain. This is meant to be used to reduce
743 contention - it cannot guarantee how many records will be locked */
744 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
746 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
749 /* lock/unlock one hash chain, non-blocking. This is meant to be used
750 to reduce contention - it cannot guarantee how many records will be
752 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
754 return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
757 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
758 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
760 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
763 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
764 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
766 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
769 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
771 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
774 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
776 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
779 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
781 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
786 /* record lock stops delete underneath */
787 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
789 return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
793 Write locks override our own fcntl readlocks, so check it here.
794 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
795 an error to fail to get the lock here.
797 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
799 struct tdb_traverse_lock *i;
800 for (i = &tdb->travlocks; i; i = i->next)
803 return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
807 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
808 an error to fail to get the lock here.
810 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
812 return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
815 /* fcntl locks don't stack: avoid unlocking someone else's */
816 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
818 struct tdb_traverse_lock *i;
823 for (i = &tdb->travlocks; i; i = i->next)
826 return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
831 /* check for an out of bounds access - if it is out of bounds then
832 see if the database has been expanded by someone else and expand
834 note that "len" is the minimum length needed for the db
836 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
839 if (len <= tdb->map_size)
841 if (tdb->flags & TDB_INTERNAL) {
843 /* Ensure ecode is set for log fn. */
844 tdb->ecode = TDB_ERR_IO;
845 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
846 (int)len, (int)tdb->map_size));
848 return TDB_ERRCODE(TDB_ERR_IO, -1);
851 if (fstat(tdb->fd, &st) == -1) {
852 return TDB_ERRCODE(TDB_ERR_IO, -1);
855 if (st.st_size < (size_t)len) {
857 /* Ensure ecode is set for log fn. */
858 tdb->ecode = TDB_ERR_IO;
859 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
860 (int)len, (int)st.st_size));
862 return TDB_ERRCODE(TDB_ERR_IO, -1);
865 /* Unmap, update size, remap */
866 if (tdb_munmap(tdb) == -1)
867 return TDB_ERRCODE(TDB_ERR_IO, -1);
868 tdb->map_size = st.st_size;
873 /* write a lump of data at a specified offset */
874 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
875 const void *buf, tdb_len_t len)
881 if (tdb->read_only || tdb->traverse_read) {
882 tdb->ecode = TDB_ERR_RDONLY;
886 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
890 memcpy(off + (char *)tdb->map_ptr, buf, len);
891 } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
892 /* Ensure ecode is set for log fn. */
893 tdb->ecode = TDB_ERR_IO;
894 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
895 off, len, strerror(errno)));
896 return TDB_ERRCODE(TDB_ERR_IO, -1);
901 /* Endian conversion: we only ever deal with 4 byte quantities */
902 void *tdb_convert(void *buf, u32 size)
904 u32 i, *p = (u32 *)buf;
905 for (i = 0; i < size / 4; i++)
906 p[i] = TDB_BYTEREV(p[i]);
911 /* read a lump of data at a specified offset, maybe convert */
912 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
913 tdb_len_t len, int cv)
915 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
920 memcpy(buf, off + (char *)tdb->map_ptr, len);
922 ssize_t ret = pread(tdb->fd, buf, len, off);
923 if (ret != (ssize_t)len) {
924 /* Ensure ecode is set for log fn. */
925 tdb->ecode = TDB_ERR_IO;
926 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
927 "len=%d ret=%d (%s) map_size=%d\n",
928 (int)off, (int)len, (int)ret, strerror(errno),
929 (int)tdb->map_size));
930 return TDB_ERRCODE(TDB_ERR_IO, -1);
934 tdb_convert(buf, len);
942 do an unlocked scan of the hash table heads to find the next non-zero head. The value
943 will then be confirmed with the lock held
945 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
949 for (;h < tdb->header.hash_size;h++) {
950 if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
956 for (;h < tdb->header.hash_size;h++) {
957 if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
966 int tdb_munmap(struct tdb_context *tdb)
968 if (tdb->flags & TDB_INTERNAL)
973 int ret = munmap(tdb->map_ptr, tdb->map_size);
982 void tdb_mmap(struct tdb_context *tdb)
984 if (tdb->flags & TDB_INTERNAL)
988 if (!(tdb->flags & TDB_NOMMAP)) {
989 tdb->map_ptr = mmap(NULL, tdb->map_size,
990 PROT_READ|(tdb->read_only? 0:PROT_WRITE),
991 MAP_SHARED|MAP_FILE, tdb->fd, 0);
994 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
997 if (tdb->map_ptr == MAP_FAILED) {
999 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1000 tdb->map_size, strerror(errno)));
1003 tdb->map_ptr = NULL;
1006 tdb->map_ptr = NULL;
1010 /* expand a file. we prefer to use ftruncate, as that is what posix
1011 says to use for mmap expansion */
1012 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1016 if (tdb->read_only || tdb->traverse_read) {
1017 tdb->ecode = TDB_ERR_RDONLY;
1021 if (ftruncate(tdb->fd, size+addition) == -1) {
1023 if (pwrite(tdb->fd, &b, 1, (size+addition) - 1) != 1) {
1024 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1025 size+addition, strerror(errno)));
1030 /* now fill the file with something. This ensures that the
1031 file isn't sparse, which would be very bad if we ran out of
1032 disk. This must be done with write, not via mmap */
1033 memset(buf, TDB_PAD_BYTE, sizeof(buf));
1035 int n = addition>sizeof(buf)?sizeof(buf):addition;
1036 int ret = pwrite(tdb->fd, buf, n, size);
1038 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1039 n, strerror(errno)));
1049 /* expand the database at least size bytes by expanding the underlying
1050 file and doing the mmap again if necessary */
1051 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1053 struct list_struct rec;
1056 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1057 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1061 /* must know about any previous expansions by another process */
1062 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1064 /* always make room for at least 10 more records, and round
1065 the database up to a multiple of the page size */
1066 size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1068 if (!(tdb->flags & TDB_INTERNAL))
1072 * We must ensure the file is unmapped before doing this
1073 * to ensure consistency with systems like OpenBSD where
1074 * writes and mmaps are not consistent.
1077 /* expand the file itself */
1078 if (!(tdb->flags & TDB_INTERNAL)) {
1079 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1083 tdb->map_size += size;
1085 if (tdb->flags & TDB_INTERNAL) {
1086 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1089 tdb->map_size -= size;
1092 tdb->map_ptr = new_map_ptr;
1095 * We must ensure the file is remapped before adding the space
1096 * to ensure consistency with systems like OpenBSD where
1097 * writes and mmaps are not consistent.
1100 /* We're ok if the mmap fails as we'll fallback to read/write */
1104 /* form a new freelist record */
1105 memset(&rec,'\0',sizeof(rec));
1106 rec.rec_len = size - sizeof(rec);
1108 /* link it into the free list */
1109 offset = tdb->map_size - size;
1110 if (tdb_free(tdb, offset, &rec) == -1)
1113 tdb_unlock(tdb, -1, F_WRLCK);
1116 tdb_unlock(tdb, -1, F_WRLCK);
1120 /* read/write a tdb_off_t */
1121 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1123 return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1126 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1129 return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1133 /* read a lump of data, allocating the space for it */
1134 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1138 /* some systems don't like zero length malloc */
1143 if (!(buf = (unsigned char *)malloc(len))) {
1144 /* Ensure ecode is set for log fn. */
1145 tdb->ecode = TDB_ERR_OOM;
1146 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1147 len, strerror(errno)));
1148 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1150 if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1157 /* Give a piece of tdb data to a parser */
1159 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1160 tdb_off_t offset, tdb_len_t len,
1161 int (*parser)(TDB_DATA key, TDB_DATA data,
1162 void *private_data),
1170 if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1172 * Optimize by avoiding the malloc/memcpy/free, point the
1173 * parser directly at the mmap area.
1175 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1178 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1179 return parser(key, data, private_data);
1182 if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1186 result = parser(key, data, private_data);
1191 /* read/write a record */
1192 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1194 if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1196 if (TDB_BAD_MAGIC(rec)) {
1197 /* Ensure ecode is set for log fn. */
1198 tdb->ecode = TDB_ERR_CORRUPT;
1199 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1200 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1202 return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1205 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1207 struct list_struct r = *rec;
1208 return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1211 static const struct tdb_methods io_methods = {
1214 tdb_next_hash_chain,
1221 initialise the default methods table
1223 void tdb_io_init(struct tdb_context *tdb)
1225 tdb->methods = &io_methods;
1228 /* file: transaction.c */
1233 - only allow a single transaction at a time per database. This makes
1234 using the transaction API simpler, as otherwise the caller would
1235 have to cope with temporary failures in transactions that conflict
1236 with other current transactions
1238 - keep the transaction recovery information in the same file as the
1239 database, using a special 'transaction recovery' record pointed at
1240 by the header. This removes the need for extra journal files as
1241 used by some other databases
1243 - dynamically allocated the transaction recover record, re-using it
1244 for subsequent transactions. If a larger record is needed then
1245 tdb_free() the old record to place it on the normal tdb freelist
1246 before allocating the new record
1248 - during transactions, keep a linked list of writes all that have
1249 been performed by intercepting all tdb_write() calls. The hooked
1250 transaction versions of tdb_read() and tdb_write() check this
1251 linked list and try to use the elements of the list in preference
1252 to the real database.
1254 - don't allow any locks to be held when a transaction starts,
1255 otherwise we can end up with deadlock (plus lack of lock nesting
1256 in posix locks would mean the lock is lost)
1258 - if the caller gains a lock during the transaction but doesn't
1259 release it then fail the commit
1261 - allow for nested calls to tdb_transaction_start(), re-using the
1262 existing transaction record. If the inner transaction is cancelled
1263 then a subsequent commit will fail
1265 - keep a mirrored copy of the tdb hash chain heads to allow for the
1266 fast hash heads scan on traverse, updating the mirrored copy in
1267 the transaction version of tdb_write
1269 - allow callers to mix transaction and non-transaction use of tdb,
1270 although once a transaction is started then an exclusive lock is
1271 gained until the transaction is committed or cancelled
1273 - the commit stategy involves first saving away all modified data
1274 into a linearised buffer in the transaction recovery area, then
1275 marking the transaction recovery area with a magic value to
1276 indicate a valid recovery record. In total 4 fsync/msync calls are
1277 needed per commit to prevent race conditions. It might be possible
1278 to reduce this to 3 or even 2 with some more work.
1280 - check for a valid recovery record on open of the tdb, while the
1281 global lock is held. Automatically recover from the transaction
1282 recovery area if needed, then continue with the open as
1283 usual. This allows for smooth crash recovery with no administrator
1286 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1287 still available, but no transaction recovery area is used and no
1288 fsync/msync calls are made.
1292 struct tdb_transaction_el {
1293 struct tdb_transaction_el *next, *prev;
1296 unsigned char *data;
1300 hold the context of any current transaction
1302 struct tdb_transaction {
1303 /* we keep a mirrored copy of the tdb hash heads here so
1304 tdb_next_hash_chain() can operate efficiently */
1307 /* the original io methods - used to do IOs to the real db */
1308 const struct tdb_methods *io_methods;
1310 /* the list of transaction elements. We use a doubly linked
1311 list with a last pointer to allow us to keep the list
1312 ordered, with first element at the front of the list. It
1313 needs to be doubly linked as the read/write traversals need
1314 to be backwards, while the commit needs to be forwards */
1315 struct tdb_transaction_el *elements, *elements_last;
1317 /* non-zero when an internal transaction error has
1318 occurred. All write operations will then fail until the
1319 transaction is ended */
1320 int transaction_error;
1322 /* when inside a transaction we need to keep track of any
1323 nested tdb_transaction_start() calls, as these are allowed,
1324 but don't create a new transaction */
1327 /* old file size before transaction */
1328 tdb_len_t old_map_size;
1333 read while in a transaction. We need to check first if the data is in our list
1334 of transaction elements, then if not do a real read
1336 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1337 tdb_len_t len, int cv)
1339 struct tdb_transaction_el *el;
1341 /* we need to walk the list backwards to get the most recent data */
1342 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1345 if (off+len <= el->offset) {
1348 if (off >= el->offset + el->length) {
1352 /* an overlapping read - needs to be split into up to
1353 2 reads and a memcpy */
1354 if (off < el->offset) {
1355 partial = el->offset - off;
1356 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1361 buf = (void *)(partial + (char *)buf);
1363 if (off + len <= el->offset + el->length) {
1366 partial = el->offset + el->length - off;
1368 memcpy(buf, el->data + (off - el->offset), partial);
1370 tdb_convert(buf, len);
1374 buf = (void *)(partial + (char *)buf);
1376 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1383 /* its not in the transaction elements - do a real read */
1384 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1387 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1388 tdb->ecode = TDB_ERR_IO;
1389 tdb->transaction->transaction_error = 1;
1395 write while in a transaction
1397 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1398 const void *buf, tdb_len_t len)
1400 struct tdb_transaction_el *el, *best_el=NULL;
1406 /* if the write is to a hash head, then update the transaction
1408 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1409 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1410 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1411 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1414 /* first see if we can replace an existing entry */
1415 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1418 if (best_el == NULL && off == el->offset+el->length) {
1422 if (off+len <= el->offset) {
1425 if (off >= el->offset + el->length) {
1429 /* an overlapping write - needs to be split into up to
1430 2 writes and a memcpy */
1431 if (off < el->offset) {
1432 partial = el->offset - off;
1433 if (transaction_write(tdb, off, buf, partial) != 0) {
1438 buf = (const void *)(partial + (const char *)buf);
1440 if (off + len <= el->offset + el->length) {
1443 partial = el->offset + el->length - off;
1445 memcpy(el->data + (off - el->offset), buf, partial);
1448 buf = (const void *)(partial + (const char *)buf);
1450 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1457 /* see if we can append the new entry to an existing entry */
1458 if (best_el && best_el->offset + best_el->length == off &&
1459 (off+len < tdb->transaction->old_map_size ||
1460 off > tdb->transaction->old_map_size)) {
1461 unsigned char *data = best_el->data;
1463 el->data = (unsigned char *)realloc(el->data,
1465 if (el->data == NULL) {
1466 tdb->ecode = TDB_ERR_OOM;
1467 tdb->transaction->transaction_error = 1;
1472 memcpy(el->data + el->length, buf, len);
1474 memset(el->data + el->length, TDB_PAD_BYTE, len);
1480 /* add a new entry at the end of the list */
1481 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1483 tdb->ecode = TDB_ERR_OOM;
1484 tdb->transaction->transaction_error = 1;
1488 el->prev = tdb->transaction->elements_last;
1491 el->data = (unsigned char *)malloc(len);
1492 if (el->data == NULL) {
1494 tdb->ecode = TDB_ERR_OOM;
1495 tdb->transaction->transaction_error = 1;
1499 memcpy(el->data, buf, len);
1501 memset(el->data, TDB_PAD_BYTE, len);
1504 el->prev->next = el;
1506 tdb->transaction->elements = el;
1508 tdb->transaction->elements_last = el;
1512 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1513 tdb->ecode = TDB_ERR_IO;
1514 tdb->transaction->transaction_error = 1;
1519 accelerated hash chain head search, using the cached hash heads
1521 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1524 for (;h < tdb->header.hash_size;h++) {
1525 /* the +1 takes account of the freelist */
1526 if (0 != tdb->transaction->hash_heads[h+1]) {
1534 out of bounds check during a transaction
1536 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1538 if (len <= tdb->map_size) {
1541 return TDB_ERRCODE(TDB_ERR_IO, -1);
1545 transaction version of tdb_expand().
1547 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1550 /* add a write to the transaction elements, so subsequent
1551 reads see the zero data */
1552 if (transaction_write(tdb, size, NULL, addition) != 0) {
1560 brlock during a transaction - ignore them
1562 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1563 int rw_type, int lck_type, int probe, size_t len)
1568 static const struct tdb_methods transaction_methods = {
1571 transaction_next_hash_chain,
1573 transaction_expand_file,
1579 start a tdb transaction. No token is returned, as only a single
1580 transaction is allowed to be pending per tdb_context
1582 int tdb_transaction_start(struct tdb_context *tdb)
1584 /* some sanity checks */
1585 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1586 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1587 tdb->ecode = TDB_ERR_EINVAL;
1591 /* cope with nested tdb_transaction_start() calls */
1592 if (tdb->transaction != NULL) {
1593 tdb->transaction->nesting++;
1594 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1595 tdb->transaction->nesting));
1599 if (tdb->num_locks != 0 || tdb->global_lock.count) {
1600 /* the caller must not have any locks when starting a
1601 transaction as otherwise we'll be screwed by lack
1602 of nested locks in posix */
1603 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1604 tdb->ecode = TDB_ERR_LOCK;
1608 if (tdb->travlocks.next != NULL) {
1609 /* you cannot use transactions inside a traverse (although you can use
1610 traverse inside a transaction) as otherwise you can end up with
1612 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1613 tdb->ecode = TDB_ERR_LOCK;
1617 tdb->transaction = (struct tdb_transaction *)
1618 calloc(sizeof(struct tdb_transaction), 1);
1619 if (tdb->transaction == NULL) {
1620 tdb->ecode = TDB_ERR_OOM;
1624 /* get the transaction write lock. This is a blocking lock. As
1625 discussed with Volker, there are a number of ways we could
1626 make this async, which we will probably do in the future */
1627 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1628 SAFE_FREE(tdb->transaction);
1632 /* get a read lock from the freelist to the end of file. This
1633 is upgraded to a write lock during the commit */
1634 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1635 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1636 tdb->ecode = TDB_ERR_LOCK;
1640 /* setup a copy of the hash table heads so the hash scan in
1641 traverse can be fast */
1642 tdb->transaction->hash_heads = (u32 *)
1643 calloc(tdb->header.hash_size+1, sizeof(u32));
1644 if (tdb->transaction->hash_heads == NULL) {
1645 tdb->ecode = TDB_ERR_OOM;
1648 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1649 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1650 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1651 tdb->ecode = TDB_ERR_IO;
1655 /* make sure we know about any file expansions already done by
1657 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1658 tdb->transaction->old_map_size = tdb->map_size;
1660 /* finally hook the io methods, replacing them with
1661 transaction specific methods */
1662 tdb->transaction->io_methods = tdb->methods;
1663 tdb->methods = &transaction_methods;
1665 /* by calling this transaction write here, we ensure that we don't grow the
1666 transaction linked list due to hash table updates */
1667 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1668 TDB_HASHTABLE_SIZE(tdb)) != 0) {
1669 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1670 tdb->ecode = TDB_ERR_IO;
1671 tdb->methods = tdb->transaction->io_methods;
1678 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1679 tdb_transaction_unlock(tdb);
1680 SAFE_FREE(tdb->transaction->hash_heads);
1681 SAFE_FREE(tdb->transaction);
1687 cancel the current transaction
1689 int tdb_transaction_cancel(struct tdb_context *tdb)
1691 if (tdb->transaction == NULL) {
1692 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1696 if (tdb->transaction->nesting != 0) {
1697 tdb->transaction->transaction_error = 1;
1698 tdb->transaction->nesting--;
1702 tdb->map_size = tdb->transaction->old_map_size;
1704 /* free all the transaction elements */
1705 while (tdb->transaction->elements) {
1706 struct tdb_transaction_el *el = tdb->transaction->elements;
1707 tdb->transaction->elements = el->next;
1712 /* remove any global lock created during the transaction */
1713 if (tdb->global_lock.count != 0) {
1714 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1715 tdb->global_lock.count = 0;
1718 /* remove any locks created during the transaction */
1719 if (tdb->num_locks != 0) {
1721 for (i=0;i<tdb->num_lockrecs;i++) {
1722 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1723 F_UNLCK,F_SETLKW, 0, 1);
1726 tdb->num_lockrecs = 0;
1727 SAFE_FREE(tdb->lockrecs);
1730 /* restore the normal io methods */
1731 tdb->methods = tdb->transaction->io_methods;
1733 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1734 tdb_transaction_unlock(tdb);
1735 SAFE_FREE(tdb->transaction->hash_heads);
1736 SAFE_FREE(tdb->transaction);
1744 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1746 if (fsync(tdb->fd) != 0) {
1747 tdb->ecode = TDB_ERR_IO;
1748 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1751 #if defined(HAVE_MSYNC) && defined(MS_SYNC)
1753 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1754 if (msync(moffset + (char *)tdb->map_ptr,
1755 length + (offset - moffset), MS_SYNC) != 0) {
1756 tdb->ecode = TDB_ERR_IO;
1757 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1768 work out how much space the linearised recovery data will consume
1770 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1772 struct tdb_transaction_el *el;
1773 tdb_len_t recovery_size = 0;
1775 recovery_size = sizeof(u32);
1776 for (el=tdb->transaction->elements;el;el=el->next) {
1777 if (el->offset >= tdb->transaction->old_map_size) {
1780 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1783 return recovery_size;
1787 allocate the recovery area, or use an existing recovery area if it is
1790 static int tdb_recovery_allocate(struct tdb_context *tdb,
1791 tdb_len_t *recovery_size,
1792 tdb_off_t *recovery_offset,
1793 tdb_len_t *recovery_max_size)
1795 struct list_struct rec;
1796 const struct tdb_methods *methods = tdb->transaction->io_methods;
1797 tdb_off_t recovery_head;
1799 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1800 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1806 if (recovery_head != 0 &&
1807 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1808 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1812 *recovery_size = tdb_recovery_size(tdb);
1814 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1815 /* it fits in the existing area */
1816 *recovery_max_size = rec.rec_len;
1817 *recovery_offset = recovery_head;
1821 /* we need to free up the old recovery area, then allocate a
1822 new one at the end of the file. Note that we cannot use
1823 tdb_allocate() to allocate the new one as that might return
1824 us an area that is being currently used (as of the start of
1826 if (recovery_head != 0) {
1827 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1828 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1833 /* the tdb_free() call might have increased the recovery size */
1834 *recovery_size = tdb_recovery_size(tdb);
1836 /* round up to a multiple of page size */
1837 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1838 *recovery_offset = tdb->map_size;
1839 recovery_head = *recovery_offset;
1841 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1842 (tdb->map_size - tdb->transaction->old_map_size) +
1843 sizeof(rec) + *recovery_max_size) == -1) {
1844 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1848 /* remap the file (if using mmap) */
1849 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1851 /* we have to reset the old map size so that we don't try to expand the file
1852 again in the transaction commit, which would destroy the recovery area */
1853 tdb->transaction->old_map_size = tdb->map_size;
1855 /* write the recovery header offset and sync - we can sync without a race here
1856 as the magic ptr in the recovery record has not been set */
1857 CONVERT(recovery_head);
1858 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1859 &recovery_head, sizeof(tdb_off_t)) == -1) {
1860 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1869 setup the recovery data that will be used on a crash during commit
1871 static int transaction_setup_recovery(struct tdb_context *tdb,
1872 tdb_off_t *magic_offset)
1874 struct tdb_transaction_el *el;
1875 tdb_len_t recovery_size;
1876 unsigned char *data, *p;
1877 const struct tdb_methods *methods = tdb->transaction->io_methods;
1878 struct list_struct *rec;
1879 tdb_off_t recovery_offset, recovery_max_size;
1880 tdb_off_t old_map_size = tdb->transaction->old_map_size;
1884 check that the recovery area has enough space
1886 if (tdb_recovery_allocate(tdb, &recovery_size,
1887 &recovery_offset, &recovery_max_size) == -1) {
1891 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1893 tdb->ecode = TDB_ERR_OOM;
1897 rec = (struct list_struct *)data;
1898 memset(rec, 0, sizeof(*rec));
1901 rec->data_len = recovery_size;
1902 rec->rec_len = recovery_max_size;
1903 rec->key_len = old_map_size;
1906 /* build the recovery data into a single blob to allow us to do a single
1907 large write, which should be more efficient */
1908 p = data + sizeof(*rec);
1909 for (el=tdb->transaction->elements;el;el=el->next) {
1910 if (el->offset >= old_map_size) {
1913 if (el->offset + el->length > tdb->transaction->old_map_size) {
1914 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1916 tdb->ecode = TDB_ERR_CORRUPT;
1919 memcpy(p, &el->offset, 4);
1920 memcpy(p+4, &el->length, 4);
1924 /* the recovery area contains the old data, not the
1925 new data, so we have to call the original tdb_read
1927 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1929 tdb->ecode = TDB_ERR_IO;
1932 p += 8 + el->length;
1935 /* and the tailer */
1936 tailer = sizeof(*rec) + recovery_max_size;
1937 memcpy(p, &tailer, 4);
1940 /* write the recovery data to the recovery area */
1941 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1942 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1944 tdb->ecode = TDB_ERR_IO;
1948 /* as we don't have ordered writes, we have to sync the recovery
1949 data before we update the magic to indicate that the recovery
1951 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1958 magic = TDB_RECOVERY_MAGIC;
1961 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1963 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1964 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1965 tdb->ecode = TDB_ERR_IO;
1969 /* ensure the recovery magic marker is on disk */
1970 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1978 commit the current transaction
1980 int tdb_transaction_commit(struct tdb_context *tdb)
1982 const struct tdb_methods *methods;
1983 tdb_off_t magic_offset = 0;
1986 if (tdb->transaction == NULL) {
1987 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1991 if (tdb->transaction->transaction_error) {
1992 tdb->ecode = TDB_ERR_IO;
1993 tdb_transaction_cancel(tdb);
1994 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1998 if (tdb->transaction->nesting != 0) {
1999 tdb->transaction->nesting--;
2003 /* check for a null transaction */
2004 if (tdb->transaction->elements == NULL) {
2005 tdb_transaction_cancel(tdb);
2009 methods = tdb->transaction->io_methods;
2011 /* if there are any locks pending then the caller has not
2012 nested their locks properly, so fail the transaction */
2013 if (tdb->num_locks || tdb->global_lock.count) {
2014 tdb->ecode = TDB_ERR_LOCK;
2015 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2016 tdb_transaction_cancel(tdb);
2020 /* upgrade the main transaction lock region to a write lock */
2021 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2022 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2023 tdb->ecode = TDB_ERR_LOCK;
2024 tdb_transaction_cancel(tdb);
2028 /* get the global lock - this prevents new users attaching to the database
2029 during the commit */
2030 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2031 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2032 tdb->ecode = TDB_ERR_LOCK;
2033 tdb_transaction_cancel(tdb);
2037 if (!(tdb->flags & TDB_NOSYNC)) {
2038 /* write the recovery data to the end of the file */
2039 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2040 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2041 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2042 tdb_transaction_cancel(tdb);
2047 /* expand the file to the new size if needed */
2048 if (tdb->map_size != tdb->transaction->old_map_size) {
2049 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2051 tdb->transaction->old_map_size) == -1) {
2052 tdb->ecode = TDB_ERR_IO;
2053 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2054 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2055 tdb_transaction_cancel(tdb);
2058 tdb->map_size = tdb->transaction->old_map_size;
2059 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2062 /* perform all the writes */
2063 while (tdb->transaction->elements) {
2064 struct tdb_transaction_el *el = tdb->transaction->elements;
2066 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2067 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2069 /* we've overwritten part of the data and
2070 possibly expanded the file, so we need to
2071 run the crash recovery code */
2072 tdb->methods = methods;
2073 tdb_transaction_recover(tdb);
2075 tdb_transaction_cancel(tdb);
2076 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2078 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2081 tdb->transaction->elements = el->next;
2086 if (!(tdb->flags & TDB_NOSYNC)) {
2087 /* ensure the new data is on disk */
2088 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2092 /* remove the recovery marker */
2093 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2094 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2098 /* ensure the recovery marker has been removed on disk */
2099 if (transaction_sync(tdb, magic_offset, 4) == -1) {
2104 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2107 TODO: maybe write to some dummy hdr field, or write to magic
2108 offset without mmap, before the last sync, instead of the
2112 /* on some systems (like Linux 2.6.x) changes via mmap/msync
2113 don't change the mtime of the file, this means the file may
2114 not be backed up (as tdb rounding to block sizes means that
2115 file size changes are quite rare too). The following forces
2116 mtime changes when a transaction completes */
2118 utime(tdb->name, NULL);
2121 /* use a transaction cancel to free memory and remove the
2122 transaction locks */
2123 tdb_transaction_cancel(tdb);
2129 recover from an aborted transaction. Must be called with exclusive
2130 database write access already established (including the global
2131 lock to prevent new processes attaching)
2133 int tdb_transaction_recover(struct tdb_context *tdb)
2135 tdb_off_t recovery_head, recovery_eof;
2136 unsigned char *data, *p;
2138 struct list_struct rec;
2140 /* find the recovery area */
2141 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2142 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2143 tdb->ecode = TDB_ERR_IO;
2147 if (recovery_head == 0) {
2148 /* we have never allocated a recovery record */
2152 /* read the recovery record */
2153 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2154 sizeof(rec), DOCONV()) == -1) {
2155 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2156 tdb->ecode = TDB_ERR_IO;
2160 if (rec.magic != TDB_RECOVERY_MAGIC) {
2161 /* there is no valid recovery data */
2165 if (tdb->read_only) {
2166 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2167 tdb->ecode = TDB_ERR_CORRUPT;
2171 recovery_eof = rec.key_len;
2173 data = (unsigned char *)malloc(rec.data_len);
2175 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2176 tdb->ecode = TDB_ERR_OOM;
2180 /* read the full recovery data */
2181 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2182 rec.data_len, 0) == -1) {
2183 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2184 tdb->ecode = TDB_ERR_IO;
2188 /* recover the file data */
2190 while (p+8 < data + rec.data_len) {
2196 memcpy(&len, p+4, 4);
2198 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2200 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2201 tdb->ecode = TDB_ERR_IO;
2209 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2210 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2211 tdb->ecode = TDB_ERR_IO;
2215 /* if the recovery area is after the recovered eof then remove it */
2216 if (recovery_eof <= recovery_head) {
2217 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2218 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2219 tdb->ecode = TDB_ERR_IO;
2224 /* remove the recovery magic */
2225 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2227 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2228 tdb->ecode = TDB_ERR_IO;
2232 /* reduce the file size to the old size */
2234 if (ftruncate(tdb->fd, recovery_eof) != 0) {
2235 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2236 tdb->ecode = TDB_ERR_IO;
2239 tdb->map_size = recovery_eof;
2242 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2243 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2244 tdb->ecode = TDB_ERR_IO;
2248 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2255 /* file: freelist.c */
2257 /* read a freelist record and check for simple errors */
2258 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2260 if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2263 if (rec->magic == TDB_MAGIC) {
2264 /* this happens when a app is showdown while deleting a record - we should
2265 not completely fail when this happens */
2266 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2268 rec->magic = TDB_FREE_MAGIC;
2269 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2273 if (rec->magic != TDB_FREE_MAGIC) {
2274 /* Ensure ecode is set for log fn. */
2275 tdb->ecode = TDB_ERR_CORRUPT;
2276 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2278 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2280 if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2287 /* Remove an element from the freelist. Must have alloc lock. */
2288 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2290 tdb_off_t last_ptr, i;
2292 /* read in the freelist top */
2293 last_ptr = FREELIST_TOP;
2294 while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2296 /* We've found it! */
2297 return tdb_ofs_write(tdb, last_ptr, &next);
2299 /* Follow chain (next offset is at start of record) */
2302 TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2303 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2307 /* update a record tailer (must hold allocation lock) */
2308 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2309 const struct list_struct *rec)
2311 tdb_off_t totalsize;
2313 /* Offset of tailer from record header */
2314 totalsize = sizeof(*rec) + rec->rec_len;
2315 return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2319 /* Add an element into the freelist. Merge adjacent records if
2321 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2323 tdb_off_t right, left;
2325 /* Allocation and tailer lock */
2326 if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2329 /* set an initial tailer, so if we fail we don't leave a bogus record */
2330 if (update_tailer(tdb, offset, rec) != 0) {
2331 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2335 /* Look right first (I'm an Australian, dammit) */
2336 right = offset + sizeof(*rec) + rec->rec_len;
2337 if (right + sizeof(*rec) <= tdb->map_size) {
2338 struct list_struct r;
2340 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2341 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2345 /* If it's free, expand to include it. */
2346 if (r.magic == TDB_FREE_MAGIC) {
2347 if (remove_from_freelist(tdb, right, r.next) == -1) {
2348 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2351 rec->rec_len += sizeof(r) + r.rec_len;
2357 left = offset - sizeof(tdb_off_t);
2358 if (left > TDB_DATA_START(tdb->header.hash_size)) {
2359 struct list_struct l;
2362 /* Read in tailer and jump back to header */
2363 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2364 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2368 /* it could be uninitialised data */
2369 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2373 left = offset - leftsize;
2375 /* Now read in record */
2376 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2377 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2381 /* If it's free, expand to include it. */
2382 if (l.magic == TDB_FREE_MAGIC) {
2383 if (remove_from_freelist(tdb, left, l.next) == -1) {
2384 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2388 rec->rec_len += leftsize;
2394 if (update_tailer(tdb, offset, rec) == -1) {
2395 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2399 /* Now, prepend to free list */
2400 rec->magic = TDB_FREE_MAGIC;
2402 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2403 tdb_rec_write(tdb, offset, rec) == -1 ||
2404 tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2405 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2409 /* And we're done. */
2410 tdb_unlock(tdb, -1, F_WRLCK);
2414 tdb_unlock(tdb, -1, F_WRLCK);
2420 the core of tdb_allocate - called when we have decided which
2421 free list entry to use
2423 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2424 struct list_struct *rec, tdb_off_t last_ptr)
2426 struct list_struct newrec;
2427 tdb_off_t newrec_ptr;
2429 memset(&newrec, '\0', sizeof(newrec));
2431 /* found it - now possibly split it up */
2432 if (rec->rec_len > length + MIN_REC_SIZE) {
2433 /* Length of left piece */
2434 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2436 /* Right piece to go on free list */
2437 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2438 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2440 /* And left record is shortened */
2441 rec->rec_len = length;
2446 /* Remove allocated record from the free list */
2447 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2451 /* Update header: do this before we drop alloc
2452 lock, otherwise tdb_free() might try to
2453 merge with us, thinking we're free.
2454 (Thanks Jeremy Allison). */
2455 rec->magic = TDB_MAGIC;
2456 if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2460 /* Did we create new block? */
2462 /* Update allocated record tailer (we
2464 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2468 /* Free new record */
2469 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2474 /* all done - return the new record offset */
2478 /* allocate some space from the free list. The offset returned points
2479 to a unconnected list_struct within the database with room for at
2480 least length bytes of total data
2482 0 is returned if the space could not be allocated
2484 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2486 tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2488 tdb_off_t rec_ptr, last_ptr;
2492 if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2495 /* Extra bytes required for tailer */
2496 length += sizeof(tdb_off_t);
2499 last_ptr = FREELIST_TOP;
2501 /* read in the freelist top */
2502 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2505 bestfit.rec_ptr = 0;
2506 bestfit.last_ptr = 0;
2507 bestfit.rec_len = 0;
2510 this is a best fit allocation strategy. Originally we used
2511 a first fit strategy, but it suffered from massive fragmentation
2512 issues when faced with a slowly increasing record size.
2515 if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2519 if (rec->rec_len >= length) {
2520 if (bestfit.rec_ptr == 0 ||
2521 rec->rec_len < bestfit.rec_len) {
2522 bestfit.rec_len = rec->rec_len;
2523 bestfit.rec_ptr = rec_ptr;
2524 bestfit.last_ptr = last_ptr;
2525 /* consider a fit to be good enough if
2526 we aren't wasting more than half
2528 if (bestfit.rec_len < 2*length) {
2534 /* move to the next record */
2536 rec_ptr = rec->next;
2539 if (bestfit.rec_ptr != 0) {
2540 if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2544 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2545 tdb_unlock(tdb, -1, F_WRLCK);
2549 /* we didn't find enough space. See if we can expand the
2550 database and if we can then try again */
2551 if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2554 tdb_unlock(tdb, -1, F_WRLCK);
2558 /* file: freelistcheck.c */
2560 /* Check the freelist is good and contains no loops.
2561 Very memory intensive - only do this as a consistency
2562 checker. Heh heh - uses an in memory tdb as the storage
2563 for the "seen" record list. For some reason this strikes
2564 me as extremely clever as I don't have to write another tree
2565 data structure implementation :-).
2568 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2572 memset(&data, '\0', sizeof(data));
2573 key.dptr = (unsigned char *)&rec_ptr;
2574 key.dsize = sizeof(rec_ptr);
2575 return tdb_store(mem_tdb, key, data, TDB_INSERT);
2578 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2580 struct tdb_context *mem_tdb = NULL;
2581 struct list_struct rec;
2582 tdb_off_t rec_ptr, last_ptr;
2587 mem_tdb = tdb_open("flval", tdb->header.hash_size,
2588 TDB_INTERNAL, O_RDWR, 0600);
2593 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2598 last_ptr = FREELIST_TOP;
2600 /* Store the FREELIST_TOP record. */
2601 if (seen_insert(mem_tdb, last_ptr) == -1) {
2602 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2606 /* read in the freelist top */
2607 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2613 /* If we can't store this record (we've seen it
2614 before) then the free list has a loop and must
2617 if (seen_insert(mem_tdb, rec_ptr)) {
2618 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2622 if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2626 /* move to the next record */
2637 tdb_unlock(tdb, -1, F_WRLCK);
2641 /* file: traverse.c */
2643 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2644 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2645 struct list_struct *rec)
2647 int want_next = (tlock->off != 0);
2649 /* Lock each chain from the start one. */
2650 for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2651 if (!tlock->off && tlock->hash != 0) {
2652 /* this is an optimisation for the common case where
2653 the hash chain is empty, which is particularly
2654 common for the use of tdb with ldb, where large
2655 hashes are used. In that case we spend most of our
2656 time in tdb_brlock(), locking empty hash chains.
2658 To avoid this, we do an unlocked pre-check to see
2659 if the hash chain is empty before starting to look
2660 inside it. If it is empty then we can avoid that
2661 hash chain. If it isn't empty then we can't believe
2662 the value we get back, as we read it without a
2663 lock, so instead we get the lock and re-fetch the
2666 Notice that not doing this optimisation on the
2667 first hash chain is critical. We must guarantee
2668 that we have done at least one fcntl lock at the
2669 start of a search to guarantee that memory is
2670 coherent on SMP systems. If records are added by
2671 others during the search then thats OK, and we
2672 could possibly miss those with this trick, but we
2673 could miss them anyway without this trick, so the
2674 semantics don't change.
2676 With a non-indexed ldb search this trick gains us a
2677 factor of around 80 in speed on a linux 2.6.x
2678 system (testing using ldbtest).
2680 tdb->methods->next_hash_chain(tdb, &tlock->hash);
2681 if (tlock->hash == tdb->header.hash_size) {
2686 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2689 /* No previous record? Start at top of chain. */
2691 if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2695 /* Otherwise unlock the previous record. */
2696 if (tdb_unlock_record(tdb, tlock->off) != 0)
2701 /* We have offset of old record: grab next */
2702 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2704 tlock->off = rec->next;
2707 /* Iterate through chain */
2708 while( tlock->off) {
2710 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2713 /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2714 if (tlock->off == rec->next) {
2715 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2719 if (!TDB_DEAD(rec)) {
2720 /* Woohoo: we found one! */
2721 if (tdb_lock_record(tdb, tlock->off) != 0)
2726 /* Try to clean dead ones from old traverses */
2727 current = tlock->off;
2728 tlock->off = rec->next;
2729 if (!(tdb->read_only || tdb->traverse_read) &&
2730 tdb_do_delete(tdb, current, rec) != 0)
2733 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2736 /* We finished iteration without finding anything */
2737 return TDB_ERRCODE(TDB_SUCCESS, 0);
2741 if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2742 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2746 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2747 return -1 on error or the record count traversed
2748 if fn is NULL then it is not called
2749 a non-zero return value from fn() indicates that the traversal should stop
2751 static int tdb_traverse_internal(struct tdb_context *tdb,
2752 tdb_traverse_func fn, void *private_data,
2753 struct tdb_traverse_lock *tl)
2756 struct list_struct rec;
2759 /* This was in the initializaton, above, but the IRIX compiler
2760 * did not like it. crh
2762 tl->next = tdb->travlocks.next;
2764 /* fcntl locks don't stack: beware traverse inside traverse */
2765 tdb->travlocks.next = tl;
2767 /* tdb_next_lock places locks on the record returned, and its chain */
2768 while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2770 /* now read the full record */
2771 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2772 rec.key_len + rec.data_len);
2775 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2777 if (tdb_unlock_record(tdb, tl->off) != 0)
2778 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2781 key.dsize = rec.key_len;
2782 dbuf.dptr = key.dptr + rec.key_len;
2783 dbuf.dsize = rec.data_len;
2785 /* Drop chain lock, call out */
2786 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2788 SAFE_FREE(key.dptr);
2791 if (fn && fn(tdb, key, dbuf, private_data)) {
2792 /* They want us to terminate traversal */
2794 if (tdb_unlock_record(tdb, tl->off) != 0) {
2795 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2798 SAFE_FREE(key.dptr);
2801 SAFE_FREE(key.dptr);
2804 tdb->travlocks.next = tl->next;
2813 a write style traverse - temporarily marks the db read only
2815 int tdb_traverse_read(struct tdb_context *tdb,
2816 tdb_traverse_func fn, void *private_data)
2818 struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2821 /* we need to get a read lock on the transaction lock here to
2822 cope with the lock ordering semantics of solaris10 */
2823 if (tdb_transaction_lock(tdb, F_RDLCK)) {
2827 tdb->traverse_read++;
2828 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2829 tdb->traverse_read--;
2831 tdb_transaction_unlock(tdb);
2837 a write style traverse - needs to get the transaction lock to
2840 int tdb_traverse(struct tdb_context *tdb,
2841 tdb_traverse_func fn, void *private_data)
2843 struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2846 if (tdb->read_only || tdb->traverse_read) {
2847 return tdb_traverse_read(tdb, fn, private_data);
2850 if (tdb_transaction_lock(tdb, F_WRLCK)) {
2854 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2856 tdb_transaction_unlock(tdb);
2862 /* find the first entry in the database and return its key */
2863 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2866 struct list_struct rec;
2868 /* release any old lock */
2869 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2871 tdb->travlocks.off = tdb->travlocks.hash = 0;
2872 tdb->travlocks.lock_rw = F_RDLCK;
2874 /* Grab first record: locks chain and returned record. */
2875 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2877 /* now read the key */
2878 key.dsize = rec.key_len;
2879 key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2881 /* Unlock the hash chain of the record we just read. */
2882 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2883 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2887 /* find the next entry in the database, returning its key */
2888 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2891 TDB_DATA key = tdb_null;
2892 struct list_struct rec;
2893 unsigned char *k = NULL;
2895 /* Is locked key the old key? If so, traverse will be reliable. */
2896 if (tdb->travlocks.off) {
2897 if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2899 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2900 || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2902 || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2903 /* No, it wasn't: unlock it and start from scratch */
2904 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2908 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2912 tdb->travlocks.off = 0;
2918 if (!tdb->travlocks.off) {
2919 /* No previous element: do normal find, and lock record */
2920 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2921 if (!tdb->travlocks.off)
2923 tdb->travlocks.hash = BUCKET(rec.full_hash);
2924 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2925 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2929 oldhash = tdb->travlocks.hash;
2931 /* Grab next record: locks chain and returned record,
2932 unlocks old record */
2933 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2934 key.dsize = rec.key_len;
2935 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2937 /* Unlock the chain of this new record */
2938 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2939 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2941 /* Unlock the chain of old record */
2942 if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2943 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2949 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2952 struct list_struct rec;
2953 tdb_off_t tailer_ofs, tailer;
2955 if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2956 sizeof(rec), DOCONV()) == -1) {
2957 printf("ERROR: failed to read record at %u\n", offset);
2961 printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2962 "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2963 hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2964 rec.full_hash, rec.magic);
2966 tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2968 if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2969 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2973 if (tailer != rec.rec_len + sizeof(rec)) {
2974 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2975 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2980 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2982 tdb_off_t rec_ptr, top;
2984 top = TDB_HASH_TOP(i);
2986 if (tdb_lock(tdb, i, F_WRLCK) != 0)
2989 if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2990 return tdb_unlock(tdb, i, F_WRLCK);
2993 printf("hash=%d\n", i);
2996 rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
2999 return tdb_unlock(tdb, i, F_WRLCK);
3002 void tdb_dump_all(struct tdb_context *tdb)
3005 for (i=0;i<tdb->header.hash_size;i++) {
3006 tdb_dump_chain(tdb, i);
3008 printf("freelist:\n");
3009 tdb_dump_chain(tdb, -1);
3012 int tdb_printfreelist(struct tdb_context *tdb)
3015 long total_free = 0;
3016 tdb_off_t offset, rec_ptr;
3017 struct list_struct rec;
3019 if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3022 offset = FREELIST_TOP;
3024 /* read in the freelist top */
3025 if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3026 tdb_unlock(tdb, -1, F_WRLCK);
3030 printf("freelist top=[0x%08x]\n", rec_ptr );
3032 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3033 sizeof(rec), DOCONV()) == -1) {
3034 tdb_unlock(tdb, -1, F_WRLCK);
3038 if (rec.magic != TDB_FREE_MAGIC) {
3039 printf("bad magic 0x%08x in free list\n", rec.magic);
3040 tdb_unlock(tdb, -1, F_WRLCK);
3044 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3045 rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3046 total_free += rec.rec_len;
3048 /* move to the next record */
3051 printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3054 return tdb_unlock(tdb, -1, F_WRLCK);
3060 non-blocking increment of the tdb sequence number if the tdb has been opened using
3063 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3067 if (!(tdb->flags & TDB_SEQNUM)) {
3071 /* we ignore errors from this, as we have no sane way of
3074 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3076 tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3080 increment the tdb sequence number if the tdb has been opened using
3083 static void tdb_increment_seqnum(struct tdb_context *tdb)
3085 if (!(tdb->flags & TDB_SEQNUM)) {
3089 if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3093 tdb_increment_seqnum_nonblock(tdb);
3095 tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3098 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3100 return memcmp(data.dptr, key.dptr, data.dsize);
3103 /* Returns 0 on fail. On success, return offset of record, and fills
3105 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3106 struct list_struct *r)
3110 /* read in the hash top */
3111 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3114 /* keep looking until we find the right record */
3116 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3119 if (!TDB_DEAD(r) && hash==r->full_hash
3120 && key.dsize==r->key_len
3121 && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3122 r->key_len, tdb_key_compare,
3128 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3131 /* As tdb_find, but if you succeed, keep the lock */
3132 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3133 struct list_struct *rec)
3137 if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3139 if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3140 tdb_unlock(tdb, BUCKET(hash), locktype);
3145 /* update an entry in place - this only works if the new data size
3146 is <= the old data size and the key exists.
3147 on failure return -1.
3149 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3151 struct list_struct rec;
3155 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3158 /* must be long enough key, data and tailer */
3159 if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3160 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3164 if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3165 dbuf.dptr, dbuf.dsize) == -1)
3168 if (dbuf.dsize != rec.data_len) {
3170 rec.data_len = dbuf.dsize;
3171 return tdb_rec_write(tdb, rec_ptr, &rec);
3177 /* find an entry in the database given a key */
3178 /* If an entry doesn't exist tdb_err will be set to
3179 * TDB_ERR_NOEXIST. If a key has no data attached
3180 * then the TDB_DATA will have zero length but
3181 * a non-zero pointer
3183 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3186 struct list_struct rec;
3190 /* find which hash bucket it is in */
3191 hash = tdb->hash_fn(&key);
3192 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3195 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3197 ret.dsize = rec.data_len;
3198 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3203 * Find an entry in the database and hand the record's data to a parsing
3204 * function. The parsing function is executed under the chain read lock, so it
3205 * should be fast and should not block on other syscalls.
3207 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3209 * For mmapped tdb's that do not have a transaction open it points the parsing
3210 * function directly at the mmap area, it avoids the malloc/memcpy in this
3211 * case. If a transaction is open or no mmap is available, it has to do
3212 * malloc/read/parse/free.
3214 * This is interesting for all readers of potentially large data structures in
3215 * the tdb records, ldb indexes being one example.
3218 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3219 int (*parser)(TDB_DATA key, TDB_DATA data,
3220 void *private_data),
3224 struct list_struct rec;
3228 /* find which hash bucket it is in */
3229 hash = tdb->hash_fn(&key);
3231 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3232 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3235 ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3236 rec.data_len, parser, private_data);
3238 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3243 /* check if an entry in the database exists
3245 note that 1 is returned if the key is found and 0 is returned if not found
3246 this doesn't match the conventions in the rest of this module, but is
3247 compatible with gdbm
3249 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3251 struct list_struct rec;
3253 if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3255 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3259 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3261 u32 hash = tdb->hash_fn(&key);
3262 return tdb_exists_hash(tdb, key, hash);
3265 /* actually delete an entry in the database given the offset */
3266 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3268 tdb_off_t last_ptr, i;
3269 struct list_struct lastrec;
3271 if (tdb->read_only || tdb->traverse_read) return -1;
3273 if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3274 /* Someone traversing here: mark it as dead */
3275 rec->magic = TDB_DEAD_MAGIC;
3276 return tdb_rec_write(tdb, rec_ptr, rec);
3278 if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3281 /* find previous record in hash chain */
3282 if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3284 for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3285 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3288 /* unlink it: next ptr is at start of record. */
3290 last_ptr = TDB_HASH_TOP(rec->full_hash);
3291 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3294 /* recover the space */
3295 if (tdb_free(tdb, rec_ptr, rec) == -1)
3300 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3304 struct list_struct rec;
3306 /* read in the hash top */
3307 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3311 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3314 if (rec.magic == TDB_DEAD_MAGIC) {
3323 * Purge all DEAD records from a hash chain
3325 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3328 struct list_struct rec;
3331 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3335 /* read in the hash top */
3336 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3342 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3348 if (rec.magic == TDB_DEAD_MAGIC
3349 && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3356 tdb_unlock(tdb, -1, F_WRLCK);
3360 /* delete an entry in the database given a key */
3361 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3364 struct list_struct rec;
3367 if (tdb->max_dead_records != 0) {
3370 * Allow for some dead records per hash chain, mainly for
3371 * tdb's with a very high create/delete rate like locking.tdb.
3374 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3377 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3379 * Don't let the per-chain freelist grow too large,
3380 * delete all existing dead records
3382 tdb_purge_dead(tdb, hash);
3385 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3386 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3391 * Just mark the record as dead.
3393 rec.magic = TDB_DEAD_MAGIC;
3394 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3397 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3401 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3405 tdb_increment_seqnum(tdb);
3408 if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3409 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3413 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3415 u32 hash = tdb->hash_fn(&key);
3416 return tdb_delete_hash(tdb, key, hash);
3420 * See if we have a dead record around with enough space
3422 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3423 struct list_struct *r, tdb_len_t length)
3427 /* read in the hash top */
3428 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3431 /* keep looking until we find the right record */
3433 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3436 if (TDB_DEAD(r) && r->rec_len >= length) {
3438 * First fit for simple coding, TODO: change to best
3448 /* store an element in the database, replacing any existing element
3451 return 0 on success, -1 on failure
3453 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3455 struct list_struct rec;
3461 if (tdb->read_only || tdb->traverse_read) {
3462 tdb->ecode = TDB_ERR_RDONLY;
3466 /* find which hash bucket it is in */
3467 hash = tdb->hash_fn(&key);
3468 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3471 /* check for it existing, on insert. */
3472 if (flag == TDB_INSERT) {
3473 if (tdb_exists_hash(tdb, key, hash)) {
3474 tdb->ecode = TDB_ERR_EXISTS;
3478 /* first try in-place update, on modify or replace. */
3479 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3482 if (tdb->ecode == TDB_ERR_NOEXIST &&
3483 flag == TDB_MODIFY) {
3484 /* if the record doesn't exist and we are in TDB_MODIFY mode then
3485 we should fail the store */
3489 /* reset the error code potentially set by the tdb_update() */
3490 tdb->ecode = TDB_SUCCESS;
3492 /* delete any existing record - if it doesn't exist we don't
3493 care. Doing this first reduces fragmentation, and avoids
3494 coalescing with `allocated' block before it's updated. */
3495 if (flag != TDB_INSERT)
3496 tdb_delete_hash(tdb, key, hash);
3498 /* Copy key+value *before* allocating free space in case malloc
3499 fails and we are left with a dead spot in the tdb. */
3501 if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3502 tdb->ecode = TDB_ERR_OOM;
3506 memcpy(p, key.dptr, key.dsize);
3508 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3510 if (tdb->max_dead_records != 0) {
3512 * Allow for some dead records per hash chain, look if we can
3513 * find one that can hold the new record. We need enough space
3514 * for key, data and tailer. If we find one, we don't have to
3515 * consult the central freelist.
3517 rec_ptr = tdb_find_dead(
3519 key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3522 rec.key_len = key.dsize;
3523 rec.data_len = dbuf.dsize;
3524 rec.full_hash = hash;
3525 rec.magic = TDB_MAGIC;
3526 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3527 || tdb->methods->tdb_write(
3528 tdb, rec_ptr + sizeof(rec),
3529 p, key.dsize + dbuf.dsize) == -1) {
3537 * We have to allocate some space from the freelist, so this means we
3538 * have to lock it. Use the chance to purge all the DEAD records from
3539 * the hash chain under the freelist lock.
3542 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3546 if ((tdb->max_dead_records != 0)
3547 && (tdb_purge_dead(tdb, hash) == -1)) {
3548 tdb_unlock(tdb, -1, F_WRLCK);
3552 /* we have to allocate some space */
3553 rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3555 tdb_unlock(tdb, -1, F_WRLCK);
3561 /* Read hash top into next ptr */
3562 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3565 rec.key_len = key.dsize;
3566 rec.data_len = dbuf.dsize;
3567 rec.full_hash = hash;
3568 rec.magic = TDB_MAGIC;
3570 /* write out and point the top of the hash chain at it */
3571 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3572 || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3573 || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3574 /* Need to tdb_unallocate() here */
3582 tdb_increment_seqnum(tdb);
3586 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3591 /* Append to an entry. Create if not exist. */
3592 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3598 /* find which hash bucket it is in */
3599 hash = tdb->hash_fn(&key);
3600 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3603 dbuf = tdb_fetch(tdb, key);
3605 if (dbuf.dptr == NULL) {
3606 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3608 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3609 dbuf.dsize + new_dbuf.dsize);
3610 if (new_dptr == NULL) {
3613 dbuf.dptr = new_dptr;
3616 if (dbuf.dptr == NULL) {
3617 tdb->ecode = TDB_ERR_OOM;
3621 memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3622 dbuf.dsize += new_dbuf.dsize;
3624 ret = tdb_store(tdb, key, dbuf, 0);
3627 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3628 SAFE_FREE(dbuf.dptr);
3634 return the name of the current tdb file
3635 useful for external logging functions
3637 const char *tdb_name(struct tdb_context *tdb)
3643 return the underlying file descriptor being used by tdb, or -1
3644 useful for external routines that want to check the device/inode
3647 int tdb_fd(struct tdb_context *tdb)
3653 return the current logging function
3654 useful for external tdb routines that wish to log tdb errors
3656 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3658 return tdb->log.log_fn;
3663 get the tdb sequence number. Only makes sense if the writers opened
3664 with TDB_SEQNUM set. Note that this sequence number will wrap quite
3665 quickly, so it should only be used for a 'has something changed'
3666 test, not for code that relies on the count of the number of changes
3667 made. If you want a counter then use a tdb record.
3669 The aim of this sequence number is to allow for a very lightweight
3670 test of a possible tdb change.
3672 int tdb_get_seqnum(struct tdb_context *tdb)
3676 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3680 int tdb_hash_size(struct tdb_context *tdb)
3682 return tdb->header.hash_size;
3685 size_t tdb_map_size(struct tdb_context *tdb)
3687 return tdb->map_size;
3690 int tdb_get_flags(struct tdb_context *tdb)
3697 enable sequence number handling on an open tdb
3699 void tdb_enable_seqnum(struct tdb_context *tdb)
3701 tdb->flags |= TDB_SEQNUM;
3706 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3707 static struct tdb_context *tdbs = NULL;
3710 /* This is from a hash algorithm suggested by Rogier Wolff */
3711 static unsigned int default_tdb_hash(TDB_DATA *key)
3713 u32 value; /* Used to compute the hash value. */
3714 u32 i; /* Used to cycle through random values. */
3716 /* Set the initial value from the key size. */
3717 for (value = 0, i=0; i < key->dsize; i++)
3718 value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3724 /* initialise a new database with a specified hash size */
3725 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3727 struct tdb_header *newdb;
3730 /* We make it up in memory, then write it out if not internal */
3731 size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3732 if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3733 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3735 /* Fill in the header */
3736 newdb->version = TDB_VERSION;
3737 newdb->hash_size = hash_size;
3738 if (tdb->flags & TDB_INTERNAL) {
3739 tdb->map_size = size;
3740 tdb->map_ptr = (char *)newdb;
3741 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3742 /* Convert the `ondisk' version if asked. */
3746 if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3749 if (ftruncate(tdb->fd, 0) == -1)
3752 /* This creates an endian-converted header, as if read from disk */
3754 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3755 /* Don't endian-convert the magic food! */
3756 memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3757 if (write(tdb->fd, newdb, size) != size) {
3770 static int tdb_already_open(dev_t device,
3773 struct tdb_context *i;
3775 for (i = tdbs; i; i = i->next) {
3776 if (i->device == device && i->inode == ino) {
3784 /* open the database, creating it if necessary
3786 The open_flags and mode are passed straight to the open call on the
3787 database file. A flags value of O_WRONLY is invalid. The hash size
3788 is advisory, use zero for a default value.
3790 Return is NULL on error, in which case errno is also set. Don't
3791 try to call tdb_error or tdb_errname, just do strerror(errno).
3793 @param name may be NULL for internal databases. */
3794 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3795 int open_flags, mode_t mode)
3797 return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3800 /* a default logging function */
3801 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3802 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3807 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3808 int open_flags, mode_t mode,
3809 const struct tdb_logging_context *log_ctx,
3810 tdb_hash_func hash_fn)
3812 struct tdb_context *tdb;
3814 int rev = 0, locked = 0;
3818 if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3819 /* Can't log this */
3826 tdb->map_ptr = NULL;
3827 tdb->flags = tdb_flags;
3828 tdb->open_flags = open_flags;
3830 tdb->log = *log_ctx;
3832 tdb->log.log_fn = null_log_fn;
3833 tdb->log.log_private = NULL;
3835 tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3837 /* cache the page size */
3838 tdb->page_size = sysconf(_SC_PAGESIZE);
3839 if (tdb->page_size <= 0) {
3840 tdb->page_size = 0x2000;
3843 if ((open_flags & O_ACCMODE) == O_WRONLY) {
3844 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3851 hash_size = DEFAULT_HASH_SIZE;
3852 if ((open_flags & O_ACCMODE) == O_RDONLY) {
3854 /* read only databases don't do locking or clear if first */
3855 tdb->flags |= TDB_NOLOCK;
3856 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3859 /* internal databases don't mmap or lock, and start off cleared */
3860 if (tdb->flags & TDB_INTERNAL) {
3861 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3862 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3863 if (tdb_new_database(tdb, hash_size) != 0) {
3864 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3870 if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3871 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3872 name, strerror(errno)));
3873 goto fail; /* errno set by open(2) */
3876 /* ensure there is only one process initialising at once */
3877 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3878 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3879 name, strerror(errno)));
3880 goto fail; /* errno set by tdb_brlock */
3883 /* we need to zero database if we are the only one with it open */
3884 if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3885 (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3886 open_flags |= O_CREAT;
3887 if (ftruncate(tdb->fd, 0) == -1) {
3888 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3889 "failed to truncate %s: %s\n",
3890 name, strerror(errno)));
3891 goto fail; /* errno set by ftruncate */
3895 if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3896 || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3897 || (tdb->header.version != TDB_VERSION
3898 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3899 /* its not a valid database - possibly initialise it */
3900 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3901 errno = EIO; /* ie bad format or something */
3904 rev = (tdb->flags & TDB_CONVERT);
3906 vp = (unsigned char *)&tdb->header.version;
3907 vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3908 (((u32)vp[2]) << 8) | (u32)vp[3];
3909 tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3911 tdb->flags &= ~TDB_CONVERT;
3913 tdb->flags |= TDB_CONVERT;
3914 tdb_convert(&tdb->header, sizeof(tdb->header));
3916 if (fstat(tdb->fd, &st) == -1)
3919 if (tdb->header.rwlocks != 0) {
3920 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3924 /* Is it already in the open list? If so, fail. */
3925 if (tdb_already_open(st.st_dev, st.st_ino)) {
3926 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3927 "%s (%d,%d) is already open in this process\n",
3928 name, (int)st.st_dev, (int)st.st_ino));
3933 if (!(tdb->name = (char *)strdup(name))) {
3938 tdb->map_size = st.st_size;
3939 tdb->device = st.st_dev;
3940 tdb->inode = st.st_ino;
3941 tdb->max_dead_records = 0;
3944 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3945 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3946 "failed to take ACTIVE_LOCK on %s: %s\n",
3947 name, strerror(errno)));
3953 /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3954 we didn't get the initial exclusive lock as we need to let all other
3955 users know we're using it. */
3957 if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3958 /* leave this lock in place to indicate it's in use */
3959 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3963 /* if needed, run recovery */
3964 if (tdb_transaction_recover(tdb) == -1) {
3969 /* Internal (memory-only) databases skip all the code above to
3970 * do with disk files, and resume here by releasing their
3971 * global lock and hooking into the active list. */
3972 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3979 { int save_errno = errno;
3985 if (tdb->flags & TDB_INTERNAL)
3986 SAFE_FREE(tdb->map_ptr);
3990 SAFE_FREE(tdb->name);
3992 if (close(tdb->fd) != 0)
3993 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
4001 * Set the maximum number of dead records per hash chain
4004 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4006 tdb->max_dead_records = max_dead;
4012 * @returns -1 for error; 0 for success.
4014 int tdb_close(struct tdb_context *tdb)
4016 struct tdb_context **i;
4019 if (tdb->transaction) {
4020 tdb_transaction_cancel(tdb);
4024 if (tdb->flags & TDB_INTERNAL)
4025 SAFE_FREE(tdb->map_ptr);
4029 SAFE_FREE(tdb->name);
4031 ret = close(tdb->fd);
4032 SAFE_FREE(tdb->lockrecs);
4034 /* Remove from contexts list */
4035 for (i = &tdbs; *i; i = &(*i)->next) {
4042 memset(tdb, 0, sizeof(*tdb));
4048 /* register a loging function */
4049 void tdb_set_logging_function(struct tdb_context *tdb,
4050 const struct tdb_logging_context *log_ctx)
4052 tdb->log = *log_ctx;
4055 void *tdb_get_logging_private(struct tdb_context *tdb)
4057 return tdb->log.log_private;
4060 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
4061 seek pointer from our parent and to re-establish locks */
4062 int tdb_reopen(struct tdb_context *tdb)
4066 if (tdb->flags & TDB_INTERNAL) {
4067 return 0; /* Nothing to do. */
4070 if (tdb->num_locks != 0 || tdb->global_lock.count) {
4071 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4075 if (tdb->transaction != 0) {
4076 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4080 if (tdb_munmap(tdb) != 0) {
4081 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4084 if (close(tdb->fd) != 0)
4085 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4086 tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4087 if (tdb->fd == -1) {
4088 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4091 if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4092 (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4093 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4096 if (fstat(tdb->fd, &st) != 0) {
4097 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4100 if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4101 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4113 /* reopen all tdb's */
4114 int tdb_reopen_all(int parent_longlived)
4116 struct tdb_context *tdb;
4118 for (tdb=tdbs; tdb; tdb = tdb->next) {
4120 * If the parent is longlived (ie. a
4121 * parent daemon architecture), we know
4122 * it will keep it's active lock on a
4123 * tdb opened with CLEAR_IF_FIRST. Thus
4124 * for child processes we don't have to
4125 * add an active lock. This is essential
4126 * to improve performance on systems that
4127 * keep POSIX locks as a non-scalable data
4128 * structure in the kernel.
4130 if (parent_longlived) {
4131 /* Ensure no clear-if-first. */
4132 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4135 if (tdb_reopen(tdb) != 0)