2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
7 trivial database library - standalone version
9 Copyright (C) Andrew Tridgell 1999-2005
10 Copyright (C) Jeremy Allison 2000-2006
11 Copyright (C) Paul `Rusty' Russell 2000
13 ** NOTE! The following LGPL license applies to the tdb
14 ** library. This does NOT imply that all of Samba is released
17 This library is free software; you can redistribute it and/or
18 modify it under the terms of the GNU Lesser General Public
19 License as published by the Free Software Foundation; either
20 version 2 of the License, or (at your option) any later version.
22 This library is distributed in the hope that it will be useful,
23 but WITHOUT ANY WARRANTY; without even the implied warranty of
24 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
25 Lesser General Public License for more details.
27 You should have received a copy of the GNU Lesser General Public
28 License along with this library; if not, write to the Free Software
29 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
32 #ifdef CONFIG_STAND_ALONE
35 #define HAVE_SYS_MMAN_H
39 #define _XOPEN_SOURCE 500
48 #ifdef HAVE_SYS_SELECT_H
49 #include <sys/select.h>
52 #include <sys/types.h>
61 #ifdef HAVE_SYS_MMAN_H
70 #define MAP_FAILED ((void *)-1)
74 #define strdup rep_strdup
75 static char *rep_strdup(const char *s)
85 ret = malloc(length + 1);
87 strncpy(ret, s, length);
94 #ifndef PRINTF_ATTRIBUTE
95 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
96 /** Use gcc attribute to check printf fns. a1 is the 1-based index of
97 * the parameter containing the format, and a2 the index of the first
98 * argument. Note that some gcc 2.x versions don't handle this
100 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
102 #define PRINTF_ATTRIBUTE(a1, a2)
114 #ifndef HAVE_GETPAGESIZE
115 #define getpagesize() 0x2000
118 typedef u32 tdb_len_t;
119 typedef u32 tdb_off_t;
122 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
125 #define TDB_MAGIC_FOOD "TDB file\n"
126 #define TDB_VERSION (0x26011967 + 6)
127 #define TDB_MAGIC (0x26011999U)
128 #define TDB_FREE_MAGIC (~TDB_MAGIC)
129 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
130 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
131 #define TDB_ALIGNMENT 4
132 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
133 #define DEFAULT_HASH_SIZE 131
134 #define FREELIST_TOP (sizeof(struct tdb_header))
135 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
136 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
137 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
138 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
139 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
140 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
141 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
142 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
143 #define TDB_SEQNUM_OFS offsetof(struct tdb_header, sequence_number)
144 #define TDB_PAD_BYTE 0x42
145 #define TDB_PAD_U32 0x42424242
147 /* NB assumes there is a local variable called "tdb" that is the
148 * current context, also takes doubly-parenthesized print-style
150 #define TDB_LOG(x) tdb->log.log_fn x
153 #define GLOBAL_LOCK 0
154 #define ACTIVE_LOCK 4
155 #define TRANSACTION_LOCK 8
157 /* free memory if the pointer is valid and zero the pointer */
159 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
162 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
164 #define DOCONV() (tdb->flags & TDB_CONVERT)
165 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
168 /* the body of the database is made of one list_struct for the free space
169 plus a separate data list for each hash value */
171 tdb_off_t next; /* offset of the next record in the list */
172 tdb_len_t rec_len; /* total byte length of record */
173 tdb_len_t key_len; /* byte length of key */
174 tdb_len_t data_len; /* byte length of data */
175 u32 full_hash; /* the full 32 bit hash of the key */
176 u32 magic; /* try to catch errors */
177 /* the following union is implied:
179 char record[rec_len];
184 u32 totalsize; (tailer)
190 /* this is stored at the front of every database */
192 char magic_food[32]; /* for /etc/magic */
193 u32 version; /* version of the code */
194 u32 hash_size; /* number of hash entries */
195 tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
196 tdb_off_t recovery_start; /* offset of transaction recovery region */
197 tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
198 tdb_off_t reserved[29];
201 struct tdb_lock_type {
207 struct tdb_traverse_lock {
208 struct tdb_traverse_lock *next;
216 int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
217 int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
218 void (*next_hash_chain)(struct tdb_context *, u32 *);
219 int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
220 int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
221 int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
225 char *name; /* the name of the database */
226 void *map_ptr; /* where it is currently mapped */
227 int fd; /* open file descriptor for the database */
228 tdb_len_t map_size; /* how much space has been mapped */
229 int read_only; /* opened read-only */
230 int traverse_read; /* read-only traversal */
231 struct tdb_lock_type global_lock;
233 struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
234 enum TDB_ERROR ecode; /* error code for last tdb error */
235 struct tdb_header header; /* a cached copy of the header */
236 u32 flags; /* the flags passed to tdb_open */
237 struct tdb_traverse_lock travlocks; /* current traversal locks */
238 struct tdb_context *next; /* all tdbs to avoid multiple opens */
239 dev_t device; /* uniquely identifies this tdb */
240 ino_t inode; /* uniquely identifies this tdb */
241 struct tdb_logging_context log;
242 unsigned int (*hash_fn)(TDB_DATA *key);
243 int open_flags; /* flags used in the open - needed by reopen */
244 unsigned int num_locks; /* number of chain locks held */
245 const struct tdb_methods *methods;
246 struct tdb_transaction *transaction;
248 int max_dead_records;
249 bool have_transaction_lock;
256 static int tdb_munmap(struct tdb_context *tdb);
257 static void tdb_mmap(struct tdb_context *tdb);
258 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
259 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
260 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
261 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
262 static int tdb_transaction_unlock(struct tdb_context *tdb);
263 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
264 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
265 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
266 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
268 static void *tdb_convert(void *buf, u32 size);
269 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
270 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
271 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
272 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
273 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
274 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
275 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
276 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
277 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
278 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
279 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
280 tdb_off_t offset, tdb_len_t len,
281 int (*parser)(TDB_DATA key, TDB_DATA data,
284 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
285 struct list_struct *rec);
286 static void tdb_io_init(struct tdb_context *tdb);
287 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
288 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
289 struct list_struct *rec);
294 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
299 static struct tdb_errname {
300 enum TDB_ERROR ecode; const char *estring;
301 } emap[] = { {TDB_SUCCESS, "Success"},
302 {TDB_ERR_CORRUPT, "Corrupt database"},
303 {TDB_ERR_IO, "IO Error"},
304 {TDB_ERR_LOCK, "Locking error"},
305 {TDB_ERR_OOM, "Out of memory"},
306 {TDB_ERR_EXISTS, "Record exists"},
307 {TDB_ERR_NOLOCK, "Lock exists on other keys"},
308 {TDB_ERR_EINVAL, "Invalid parameter"},
309 {TDB_ERR_NOEXIST, "Record does not exist"},
310 {TDB_ERR_RDONLY, "write not permitted"} };
312 /* Error string for the last tdb error */
313 const char *tdb_errorstr(struct tdb_context *tdb)
316 for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
317 if (tdb->ecode == emap[i].ecode)
318 return emap[i].estring;
319 return "Invalid error code";
324 #define TDB_MARK_LOCK 0x80000000
326 /* a byte range locking function - return 0 on success
327 this functions locks/unlocks 1 byte at the specified offset.
329 On error, errno is also set so that errors are passed back properly
332 note that a len of zero means lock to end of file
334 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
335 int rw_type, int lck_type, int probe, size_t len)
340 if (tdb->flags & TDB_NOLOCK) {
344 if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
345 tdb->ecode = TDB_ERR_RDONLY;
350 fl.l_whence = SEEK_SET;
356 ret = fcntl(tdb->fd,lck_type,&fl);
357 } while (ret == -1 && errno == EINTR);
360 /* Generic lock error. errno set by fcntl.
361 * EAGAIN is an expected return from non-blocking
363 if (!probe && lck_type != F_SETLK) {
364 /* Ensure error code is set for log fun to examine. */
365 tdb->ecode = TDB_ERR_LOCK;
366 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
367 tdb->fd, offset, rw_type, lck_type, (int)len));
369 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
376 upgrade a read lock to a write lock. This needs to be handled in a
377 special way as some OSes (such as solaris) have too conservative
378 deadlock detection and claim a deadlock when progress can be
379 made. For those OSes we may loop for a while.
381 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
386 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
389 if (errno != EDEADLK) {
392 /* sleep for as short a time as we can - more portable than usleep() */
395 select(0, NULL, NULL, NULL, &tv);
397 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
402 /* lock a list in the database. list -1 is the alloc list */
403 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
405 struct tdb_lock_type *new_lck;
407 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
409 ltype &= ~TDB_MARK_LOCK;
411 /* a global lock allows us to avoid per chain locks */
412 if (tdb->global_lock.count &&
413 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
417 if (tdb->global_lock.count) {
418 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
421 if (list < -1 || list >= (int)tdb->header.hash_size) {
422 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
426 if (tdb->flags & TDB_NOLOCK)
429 for (i=0; i<tdb->num_lockrecs; i++) {
430 if (tdb->lockrecs[i].list == list) {
431 if (tdb->lockrecs[i].count == 0) {
433 * Can't happen, see tdb_unlock(). It should
436 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
437 "lck->count == 0 for list %d", list));
440 * Just increment the in-memory struct, posix locks
443 tdb->lockrecs[i].count++;
448 new_lck = (struct tdb_lock_type *)realloc(
450 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
451 if (new_lck == NULL) {
455 tdb->lockrecs = new_lck;
457 /* Since fcntl locks don't nest, we do a lock for the first one,
458 and simply bump the count for future ones */
460 tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
467 tdb->lockrecs[tdb->num_lockrecs].list = list;
468 tdb->lockrecs[tdb->num_lockrecs].count = 1;
469 tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
470 tdb->num_lockrecs += 1;
475 /* lock a list in the database. list -1 is the alloc list */
476 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
479 ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
481 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
482 "ltype=%d (%s)\n", list, ltype, strerror(errno)));
487 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
488 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
490 return _tdb_lock(tdb, list, ltype, F_SETLK);
494 /* unlock the database: returns void because it's too late for errors. */
495 /* changed to return int it may be interesting to know there
496 has been an error --simo */
497 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
501 struct tdb_lock_type *lck = NULL;
502 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
504 ltype &= ~TDB_MARK_LOCK;
506 /* a global lock allows us to avoid per chain locks */
507 if (tdb->global_lock.count &&
508 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
512 if (tdb->global_lock.count) {
513 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
516 if (tdb->flags & TDB_NOLOCK)
520 if (list < -1 || list >= (int)tdb->header.hash_size) {
521 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
525 for (i=0; i<tdb->num_lockrecs; i++) {
526 if (tdb->lockrecs[i].list == list) {
527 lck = &tdb->lockrecs[i];
532 if ((lck == NULL) || (lck->count == 0)) {
533 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
537 if (lck->count > 1) {
543 * This lock has count==1 left, so we need to unlock it in the
544 * kernel. We don't bother with decrementing the in-memory array
545 * element, we're about to overwrite it with the last array element
552 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
558 * Shrink the array by overwriting the element just unlocked with the
559 * last array element.
562 if (tdb->num_lockrecs > 1) {
563 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
565 tdb->num_lockrecs -= 1;
568 * We don't bother with realloc when the array shrinks, but if we have
569 * a completely idle tdb we should get rid of the locked array.
572 if (tdb->num_lockrecs == 0) {
573 SAFE_FREE(tdb->lockrecs);
577 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
582 get the transaction lock
584 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
586 if (tdb->have_transaction_lock || tdb->global_lock.count) {
589 if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
590 F_SETLKW, 0, 1) == -1) {
591 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
592 tdb->ecode = TDB_ERR_LOCK;
595 tdb->have_transaction_lock = 1;
600 release the transaction lock
602 int tdb_transaction_unlock(struct tdb_context *tdb)
605 if (!tdb->have_transaction_lock) {
608 ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
610 tdb->have_transaction_lock = 0;
618 /* lock/unlock entire database */
619 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
621 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
623 ltype &= ~TDB_MARK_LOCK;
625 /* There are no locks on read-only dbs */
626 if (tdb->read_only || tdb->traverse_read)
627 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
629 if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
630 tdb->global_lock.count++;
634 if (tdb->global_lock.count) {
635 /* a global lock of a different type exists */
636 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
639 if (tdb->num_locks != 0) {
640 /* can't combine global and chain locks */
641 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
645 tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
646 0, 4*tdb->header.hash_size)) {
647 if (op == F_SETLKW) {
648 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
653 tdb->global_lock.count = 1;
654 tdb->global_lock.ltype = ltype;
661 /* unlock entire db */
662 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
664 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
666 ltype &= ~TDB_MARK_LOCK;
668 /* There are no locks on read-only dbs */
669 if (tdb->read_only || tdb->traverse_read) {
670 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
673 if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
674 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
677 if (tdb->global_lock.count > 1) {
678 tdb->global_lock.count--;
683 tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
684 0, 4*tdb->header.hash_size)) {
685 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
689 tdb->global_lock.count = 0;
690 tdb->global_lock.ltype = 0;
695 /* lock entire database with write lock */
696 int tdb_lockall(struct tdb_context *tdb)
698 return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
701 /* lock entire database with write lock - mark only */
702 int tdb_lockall_mark(struct tdb_context *tdb)
704 return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
707 /* unlock entire database with write lock - unmark only */
708 int tdb_lockall_unmark(struct tdb_context *tdb)
710 return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
713 /* lock entire database with write lock - nonblocking varient */
714 int tdb_lockall_nonblock(struct tdb_context *tdb)
716 return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
719 /* unlock entire database with write lock */
720 int tdb_unlockall(struct tdb_context *tdb)
722 return _tdb_unlockall(tdb, F_WRLCK);
725 /* lock entire database with read lock */
726 int tdb_lockall_read(struct tdb_context *tdb)
728 return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
731 /* lock entire database with read lock - nonblock varient */
732 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
734 return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
737 /* unlock entire database with read lock */
738 int tdb_unlockall_read(struct tdb_context *tdb)
740 return _tdb_unlockall(tdb, F_RDLCK);
743 /* lock/unlock one hash chain. This is meant to be used to reduce
744 contention - it cannot guarantee how many records will be locked */
745 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
747 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
750 /* lock/unlock one hash chain, non-blocking. This is meant to be used
751 to reduce contention - it cannot guarantee how many records will be
753 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
755 return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
758 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
759 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
761 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
764 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
765 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
767 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
770 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
772 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
775 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
777 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
780 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
782 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
787 /* record lock stops delete underneath */
788 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
790 return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
794 Write locks override our own fcntl readlocks, so check it here.
795 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
796 an error to fail to get the lock here.
798 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
800 struct tdb_traverse_lock *i;
801 for (i = &tdb->travlocks; i; i = i->next)
804 return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
808 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
809 an error to fail to get the lock here.
811 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
813 return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
816 /* fcntl locks don't stack: avoid unlocking someone else's */
817 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
819 struct tdb_traverse_lock *i;
824 for (i = &tdb->travlocks; i; i = i->next)
827 return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
832 /* check for an out of bounds access - if it is out of bounds then
833 see if the database has been expanded by someone else and expand
835 note that "len" is the minimum length needed for the db
837 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
840 if (len <= tdb->map_size)
842 if (tdb->flags & TDB_INTERNAL) {
844 /* Ensure ecode is set for log fn. */
845 tdb->ecode = TDB_ERR_IO;
846 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
847 (int)len, (int)tdb->map_size));
849 return TDB_ERRCODE(TDB_ERR_IO, -1);
852 if (fstat(tdb->fd, &st) == -1) {
853 return TDB_ERRCODE(TDB_ERR_IO, -1);
856 if (st.st_size < (size_t)len) {
858 /* Ensure ecode is set for log fn. */
859 tdb->ecode = TDB_ERR_IO;
860 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
861 (int)len, (int)st.st_size));
863 return TDB_ERRCODE(TDB_ERR_IO, -1);
866 /* Unmap, update size, remap */
867 if (tdb_munmap(tdb) == -1)
868 return TDB_ERRCODE(TDB_ERR_IO, -1);
869 tdb->map_size = st.st_size;
874 /* write a lump of data at a specified offset */
875 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
876 const void *buf, tdb_len_t len)
882 if (tdb->read_only || tdb->traverse_read) {
883 tdb->ecode = TDB_ERR_RDONLY;
887 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
891 memcpy(off + (char *)tdb->map_ptr, buf, len);
892 } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
893 /* Ensure ecode is set for log fn. */
894 tdb->ecode = TDB_ERR_IO;
895 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
896 off, len, strerror(errno)));
897 return TDB_ERRCODE(TDB_ERR_IO, -1);
902 /* Endian conversion: we only ever deal with 4 byte quantities */
903 void *tdb_convert(void *buf, u32 size)
905 u32 i, *p = (u32 *)buf;
906 for (i = 0; i < size / 4; i++)
907 p[i] = TDB_BYTEREV(p[i]);
912 /* read a lump of data at a specified offset, maybe convert */
913 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
914 tdb_len_t len, int cv)
916 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
921 memcpy(buf, off + (char *)tdb->map_ptr, len);
923 ssize_t ret = pread(tdb->fd, buf, len, off);
924 if (ret != (ssize_t)len) {
925 /* Ensure ecode is set for log fn. */
926 tdb->ecode = TDB_ERR_IO;
927 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
928 "len=%d ret=%d (%s) map_size=%d\n",
929 (int)off, (int)len, (int)ret, strerror(errno),
930 (int)tdb->map_size));
931 return TDB_ERRCODE(TDB_ERR_IO, -1);
935 tdb_convert(buf, len);
943 do an unlocked scan of the hash table heads to find the next non-zero head. The value
944 will then be confirmed with the lock held
946 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
950 for (;h < tdb->header.hash_size;h++) {
951 if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
957 for (;h < tdb->header.hash_size;h++) {
958 if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
967 int tdb_munmap(struct tdb_context *tdb)
969 if (tdb->flags & TDB_INTERNAL)
974 int ret = munmap(tdb->map_ptr, tdb->map_size);
983 void tdb_mmap(struct tdb_context *tdb)
985 if (tdb->flags & TDB_INTERNAL)
989 if (!(tdb->flags & TDB_NOMMAP)) {
990 tdb->map_ptr = mmap(NULL, tdb->map_size,
991 PROT_READ|(tdb->read_only? 0:PROT_WRITE),
992 MAP_SHARED|MAP_FILE, tdb->fd, 0);
995 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
998 if (tdb->map_ptr == MAP_FAILED) {
1000 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1001 tdb->map_size, strerror(errno)));
1004 tdb->map_ptr = NULL;
1007 tdb->map_ptr = NULL;
1011 /* expand a file. we prefer to use ftruncate, as that is what posix
1012 says to use for mmap expansion */
1013 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1017 if (tdb->read_only || tdb->traverse_read) {
1018 tdb->ecode = TDB_ERR_RDONLY;
1022 if (ftruncate(tdb->fd, size+addition) == -1) {
1024 if (pwrite(tdb->fd, &b, 1, (size+addition) - 1) != 1) {
1025 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1026 size+addition, strerror(errno)));
1031 /* now fill the file with something. This ensures that the
1032 file isn't sparse, which would be very bad if we ran out of
1033 disk. This must be done with write, not via mmap */
1034 memset(buf, TDB_PAD_BYTE, sizeof(buf));
1036 int n = addition>sizeof(buf)?sizeof(buf):addition;
1037 int ret = pwrite(tdb->fd, buf, n, size);
1039 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1040 n, strerror(errno)));
1050 /* expand the database at least size bytes by expanding the underlying
1051 file and doing the mmap again if necessary */
1052 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1054 struct list_struct rec;
1057 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1058 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1062 /* must know about any previous expansions by another process */
1063 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1065 /* always make room for at least 10 more records, and round
1066 the database up to a multiple of the page size */
1067 size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1069 if (!(tdb->flags & TDB_INTERNAL))
1073 * We must ensure the file is unmapped before doing this
1074 * to ensure consistency with systems like OpenBSD where
1075 * writes and mmaps are not consistent.
1078 /* expand the file itself */
1079 if (!(tdb->flags & TDB_INTERNAL)) {
1080 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1084 tdb->map_size += size;
1086 if (tdb->flags & TDB_INTERNAL) {
1087 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1090 tdb->map_size -= size;
1093 tdb->map_ptr = new_map_ptr;
1096 * We must ensure the file is remapped before adding the space
1097 * to ensure consistency with systems like OpenBSD where
1098 * writes and mmaps are not consistent.
1101 /* We're ok if the mmap fails as we'll fallback to read/write */
1105 /* form a new freelist record */
1106 memset(&rec,'\0',sizeof(rec));
1107 rec.rec_len = size - sizeof(rec);
1109 /* link it into the free list */
1110 offset = tdb->map_size - size;
1111 if (tdb_free(tdb, offset, &rec) == -1)
1114 tdb_unlock(tdb, -1, F_WRLCK);
1117 tdb_unlock(tdb, -1, F_WRLCK);
1121 /* read/write a tdb_off_t */
1122 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1124 return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1127 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1130 return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1134 /* read a lump of data, allocating the space for it */
1135 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1139 /* some systems don't like zero length malloc */
1144 if (!(buf = (unsigned char *)malloc(len))) {
1145 /* Ensure ecode is set for log fn. */
1146 tdb->ecode = TDB_ERR_OOM;
1147 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1148 len, strerror(errno)));
1149 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1151 if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1158 /* Give a piece of tdb data to a parser */
1160 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1161 tdb_off_t offset, tdb_len_t len,
1162 int (*parser)(TDB_DATA key, TDB_DATA data,
1163 void *private_data),
1171 if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1173 * Optimize by avoiding the malloc/memcpy/free, point the
1174 * parser directly at the mmap area.
1176 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1179 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1180 return parser(key, data, private_data);
1183 if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1187 result = parser(key, data, private_data);
1192 /* read/write a record */
1193 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1195 if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1197 if (TDB_BAD_MAGIC(rec)) {
1198 /* Ensure ecode is set for log fn. */
1199 tdb->ecode = TDB_ERR_CORRUPT;
1200 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1201 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1203 return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1206 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1208 struct list_struct r = *rec;
1209 return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1212 static const struct tdb_methods io_methods = {
1215 tdb_next_hash_chain,
1222 initialise the default methods table
1224 void tdb_io_init(struct tdb_context *tdb)
1226 tdb->methods = &io_methods;
1229 /* file: transaction.c */
1234 - only allow a single transaction at a time per database. This makes
1235 using the transaction API simpler, as otherwise the caller would
1236 have to cope with temporary failures in transactions that conflict
1237 with other current transactions
1239 - keep the transaction recovery information in the same file as the
1240 database, using a special 'transaction recovery' record pointed at
1241 by the header. This removes the need for extra journal files as
1242 used by some other databases
1244 - dynamically allocated the transaction recover record, re-using it
1245 for subsequent transactions. If a larger record is needed then
1246 tdb_free() the old record to place it on the normal tdb freelist
1247 before allocating the new record
1249 - during transactions, keep a linked list of writes all that have
1250 been performed by intercepting all tdb_write() calls. The hooked
1251 transaction versions of tdb_read() and tdb_write() check this
1252 linked list and try to use the elements of the list in preference
1253 to the real database.
1255 - don't allow any locks to be held when a transaction starts,
1256 otherwise we can end up with deadlock (plus lack of lock nesting
1257 in posix locks would mean the lock is lost)
1259 - if the caller gains a lock during the transaction but doesn't
1260 release it then fail the commit
1262 - allow for nested calls to tdb_transaction_start(), re-using the
1263 existing transaction record. If the inner transaction is cancelled
1264 then a subsequent commit will fail
1266 - keep a mirrored copy of the tdb hash chain heads to allow for the
1267 fast hash heads scan on traverse, updating the mirrored copy in
1268 the transaction version of tdb_write
1270 - allow callers to mix transaction and non-transaction use of tdb,
1271 although once a transaction is started then an exclusive lock is
1272 gained until the transaction is committed or cancelled
1274 - the commit stategy involves first saving away all modified data
1275 into a linearised buffer in the transaction recovery area, then
1276 marking the transaction recovery area with a magic value to
1277 indicate a valid recovery record. In total 4 fsync/msync calls are
1278 needed per commit to prevent race conditions. It might be possible
1279 to reduce this to 3 or even 2 with some more work.
1281 - check for a valid recovery record on open of the tdb, while the
1282 global lock is held. Automatically recover from the transaction
1283 recovery area if needed, then continue with the open as
1284 usual. This allows for smooth crash recovery with no administrator
1287 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1288 still available, but no transaction recovery area is used and no
1289 fsync/msync calls are made.
1293 struct tdb_transaction_el {
1294 struct tdb_transaction_el *next, *prev;
1297 unsigned char *data;
1301 hold the context of any current transaction
1303 struct tdb_transaction {
1304 /* we keep a mirrored copy of the tdb hash heads here so
1305 tdb_next_hash_chain() can operate efficiently */
1308 /* the original io methods - used to do IOs to the real db */
1309 const struct tdb_methods *io_methods;
1311 /* the list of transaction elements. We use a doubly linked
1312 list with a last pointer to allow us to keep the list
1313 ordered, with first element at the front of the list. It
1314 needs to be doubly linked as the read/write traversals need
1315 to be backwards, while the commit needs to be forwards */
1316 struct tdb_transaction_el *elements, *elements_last;
1318 /* non-zero when an internal transaction error has
1319 occurred. All write operations will then fail until the
1320 transaction is ended */
1321 int transaction_error;
1323 /* when inside a transaction we need to keep track of any
1324 nested tdb_transaction_start() calls, as these are allowed,
1325 but don't create a new transaction */
1328 /* old file size before transaction */
1329 tdb_len_t old_map_size;
1334 read while in a transaction. We need to check first if the data is in our list
1335 of transaction elements, then if not do a real read
1337 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1338 tdb_len_t len, int cv)
1340 struct tdb_transaction_el *el;
1342 /* we need to walk the list backwards to get the most recent data */
1343 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1346 if (off+len <= el->offset) {
1349 if (off >= el->offset + el->length) {
1353 /* an overlapping read - needs to be split into up to
1354 2 reads and a memcpy */
1355 if (off < el->offset) {
1356 partial = el->offset - off;
1357 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1362 buf = (void *)(partial + (char *)buf);
1364 if (off + len <= el->offset + el->length) {
1367 partial = el->offset + el->length - off;
1369 memcpy(buf, el->data + (off - el->offset), partial);
1371 tdb_convert(buf, len);
1375 buf = (void *)(partial + (char *)buf);
1377 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1384 /* its not in the transaction elements - do a real read */
1385 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1388 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1389 tdb->ecode = TDB_ERR_IO;
1390 tdb->transaction->transaction_error = 1;
1396 write while in a transaction
1398 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1399 const void *buf, tdb_len_t len)
1401 struct tdb_transaction_el *el, *best_el=NULL;
1407 /* if the write is to a hash head, then update the transaction
1409 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1410 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1411 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1412 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1415 /* first see if we can replace an existing entry */
1416 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1419 if (best_el == NULL && off == el->offset+el->length) {
1423 if (off+len <= el->offset) {
1426 if (off >= el->offset + el->length) {
1430 /* an overlapping write - needs to be split into up to
1431 2 writes and a memcpy */
1432 if (off < el->offset) {
1433 partial = el->offset - off;
1434 if (transaction_write(tdb, off, buf, partial) != 0) {
1439 buf = (const void *)(partial + (const char *)buf);
1441 if (off + len <= el->offset + el->length) {
1444 partial = el->offset + el->length - off;
1446 memcpy(el->data + (off - el->offset), buf, partial);
1449 buf = (const void *)(partial + (const char *)buf);
1451 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1458 /* see if we can append the new entry to an existing entry */
1459 if (best_el && best_el->offset + best_el->length == off &&
1460 (off+len < tdb->transaction->old_map_size ||
1461 off > tdb->transaction->old_map_size)) {
1462 unsigned char *data = best_el->data;
1464 el->data = (unsigned char *)realloc(el->data,
1466 if (el->data == NULL) {
1467 tdb->ecode = TDB_ERR_OOM;
1468 tdb->transaction->transaction_error = 1;
1473 memcpy(el->data + el->length, buf, len);
1475 memset(el->data + el->length, TDB_PAD_BYTE, len);
1481 /* add a new entry at the end of the list */
1482 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1484 tdb->ecode = TDB_ERR_OOM;
1485 tdb->transaction->transaction_error = 1;
1489 el->prev = tdb->transaction->elements_last;
1492 el->data = (unsigned char *)malloc(len);
1493 if (el->data == NULL) {
1495 tdb->ecode = TDB_ERR_OOM;
1496 tdb->transaction->transaction_error = 1;
1500 memcpy(el->data, buf, len);
1502 memset(el->data, TDB_PAD_BYTE, len);
1505 el->prev->next = el;
1507 tdb->transaction->elements = el;
1509 tdb->transaction->elements_last = el;
1513 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1514 tdb->ecode = TDB_ERR_IO;
1515 tdb->transaction->transaction_error = 1;
1520 accelerated hash chain head search, using the cached hash heads
1522 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1525 for (;h < tdb->header.hash_size;h++) {
1526 /* the +1 takes account of the freelist */
1527 if (0 != tdb->transaction->hash_heads[h+1]) {
1535 out of bounds check during a transaction
1537 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1539 if (len <= tdb->map_size) {
1542 return TDB_ERRCODE(TDB_ERR_IO, -1);
1546 transaction version of tdb_expand().
1548 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1551 /* add a write to the transaction elements, so subsequent
1552 reads see the zero data */
1553 if (transaction_write(tdb, size, NULL, addition) != 0) {
1561 brlock during a transaction - ignore them
1563 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1564 int rw_type, int lck_type, int probe, size_t len)
1569 static const struct tdb_methods transaction_methods = {
1572 transaction_next_hash_chain,
1574 transaction_expand_file,
1580 start a tdb transaction. No token is returned, as only a single
1581 transaction is allowed to be pending per tdb_context
1583 int tdb_transaction_start(struct tdb_context *tdb)
1585 /* some sanity checks */
1586 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1587 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1588 tdb->ecode = TDB_ERR_EINVAL;
1592 /* cope with nested tdb_transaction_start() calls */
1593 if (tdb->transaction != NULL) {
1594 tdb->transaction->nesting++;
1595 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1596 tdb->transaction->nesting));
1600 if (tdb->num_locks != 0 || tdb->global_lock.count) {
1601 /* the caller must not have any locks when starting a
1602 transaction as otherwise we'll be screwed by lack
1603 of nested locks in posix */
1604 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1605 tdb->ecode = TDB_ERR_LOCK;
1609 if (tdb->travlocks.next != NULL) {
1610 /* you cannot use transactions inside a traverse (although you can use
1611 traverse inside a transaction) as otherwise you can end up with
1613 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1614 tdb->ecode = TDB_ERR_LOCK;
1618 tdb->transaction = (struct tdb_transaction *)
1619 calloc(sizeof(struct tdb_transaction), 1);
1620 if (tdb->transaction == NULL) {
1621 tdb->ecode = TDB_ERR_OOM;
1625 /* get the transaction write lock. This is a blocking lock. As
1626 discussed with Volker, there are a number of ways we could
1627 make this async, which we will probably do in the future */
1628 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1629 SAFE_FREE(tdb->transaction);
1633 /* get a read lock from the freelist to the end of file. This
1634 is upgraded to a write lock during the commit */
1635 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1636 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1637 tdb->ecode = TDB_ERR_LOCK;
1641 /* setup a copy of the hash table heads so the hash scan in
1642 traverse can be fast */
1643 tdb->transaction->hash_heads = (u32 *)
1644 calloc(tdb->header.hash_size+1, sizeof(u32));
1645 if (tdb->transaction->hash_heads == NULL) {
1646 tdb->ecode = TDB_ERR_OOM;
1649 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1650 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1651 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1652 tdb->ecode = TDB_ERR_IO;
1656 /* make sure we know about any file expansions already done by
1658 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1659 tdb->transaction->old_map_size = tdb->map_size;
1661 /* finally hook the io methods, replacing them with
1662 transaction specific methods */
1663 tdb->transaction->io_methods = tdb->methods;
1664 tdb->methods = &transaction_methods;
1666 /* by calling this transaction write here, we ensure that we don't grow the
1667 transaction linked list due to hash table updates */
1668 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1669 TDB_HASHTABLE_SIZE(tdb)) != 0) {
1670 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1671 tdb->ecode = TDB_ERR_IO;
1672 tdb->methods = tdb->transaction->io_methods;
1679 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1680 tdb_transaction_unlock(tdb);
1681 SAFE_FREE(tdb->transaction->hash_heads);
1682 SAFE_FREE(tdb->transaction);
1688 cancel the current transaction
1690 int tdb_transaction_cancel(struct tdb_context *tdb)
1692 if (tdb->transaction == NULL) {
1693 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1697 if (tdb->transaction->nesting != 0) {
1698 tdb->transaction->transaction_error = 1;
1699 tdb->transaction->nesting--;
1703 tdb->map_size = tdb->transaction->old_map_size;
1705 /* free all the transaction elements */
1706 while (tdb->transaction->elements) {
1707 struct tdb_transaction_el *el = tdb->transaction->elements;
1708 tdb->transaction->elements = el->next;
1713 /* remove any global lock created during the transaction */
1714 if (tdb->global_lock.count != 0) {
1715 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1716 tdb->global_lock.count = 0;
1719 /* remove any locks created during the transaction */
1720 if (tdb->num_locks != 0) {
1722 for (i=0;i<tdb->num_lockrecs;i++) {
1723 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1724 F_UNLCK,F_SETLKW, 0, 1);
1727 tdb->num_lockrecs = 0;
1728 SAFE_FREE(tdb->lockrecs);
1731 /* restore the normal io methods */
1732 tdb->methods = tdb->transaction->io_methods;
1734 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1735 tdb_transaction_unlock(tdb);
1736 SAFE_FREE(tdb->transaction->hash_heads);
1737 SAFE_FREE(tdb->transaction);
1745 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1747 if (fsync(tdb->fd) != 0) {
1748 tdb->ecode = TDB_ERR_IO;
1749 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1754 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1755 if (msync(moffset + (char *)tdb->map_ptr,
1756 length + (offset - moffset), MS_SYNC) != 0) {
1757 tdb->ecode = TDB_ERR_IO;
1758 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1769 work out how much space the linearised recovery data will consume
1771 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1773 struct tdb_transaction_el *el;
1774 tdb_len_t recovery_size = 0;
1776 recovery_size = sizeof(u32);
1777 for (el=tdb->transaction->elements;el;el=el->next) {
1778 if (el->offset >= tdb->transaction->old_map_size) {
1781 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1784 return recovery_size;
1788 allocate the recovery area, or use an existing recovery area if it is
1791 static int tdb_recovery_allocate(struct tdb_context *tdb,
1792 tdb_len_t *recovery_size,
1793 tdb_off_t *recovery_offset,
1794 tdb_len_t *recovery_max_size)
1796 struct list_struct rec;
1797 const struct tdb_methods *methods = tdb->transaction->io_methods;
1798 tdb_off_t recovery_head;
1800 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1801 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1807 if (recovery_head != 0 &&
1808 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1809 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1813 *recovery_size = tdb_recovery_size(tdb);
1815 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1816 /* it fits in the existing area */
1817 *recovery_max_size = rec.rec_len;
1818 *recovery_offset = recovery_head;
1822 /* we need to free up the old recovery area, then allocate a
1823 new one at the end of the file. Note that we cannot use
1824 tdb_allocate() to allocate the new one as that might return
1825 us an area that is being currently used (as of the start of
1827 if (recovery_head != 0) {
1828 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1829 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1834 /* the tdb_free() call might have increased the recovery size */
1835 *recovery_size = tdb_recovery_size(tdb);
1837 /* round up to a multiple of page size */
1838 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1839 *recovery_offset = tdb->map_size;
1840 recovery_head = *recovery_offset;
1842 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1843 (tdb->map_size - tdb->transaction->old_map_size) +
1844 sizeof(rec) + *recovery_max_size) == -1) {
1845 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1849 /* remap the file (if using mmap) */
1850 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1852 /* we have to reset the old map size so that we don't try to expand the file
1853 again in the transaction commit, which would destroy the recovery area */
1854 tdb->transaction->old_map_size = tdb->map_size;
1856 /* write the recovery header offset and sync - we can sync without a race here
1857 as the magic ptr in the recovery record has not been set */
1858 CONVERT(recovery_head);
1859 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1860 &recovery_head, sizeof(tdb_off_t)) == -1) {
1861 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1870 setup the recovery data that will be used on a crash during commit
1872 static int transaction_setup_recovery(struct tdb_context *tdb,
1873 tdb_off_t *magic_offset)
1875 struct tdb_transaction_el *el;
1876 tdb_len_t recovery_size;
1877 unsigned char *data, *p;
1878 const struct tdb_methods *methods = tdb->transaction->io_methods;
1879 struct list_struct *rec;
1880 tdb_off_t recovery_offset, recovery_max_size;
1881 tdb_off_t old_map_size = tdb->transaction->old_map_size;
1885 check that the recovery area has enough space
1887 if (tdb_recovery_allocate(tdb, &recovery_size,
1888 &recovery_offset, &recovery_max_size) == -1) {
1892 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1894 tdb->ecode = TDB_ERR_OOM;
1898 rec = (struct list_struct *)data;
1899 memset(rec, 0, sizeof(*rec));
1902 rec->data_len = recovery_size;
1903 rec->rec_len = recovery_max_size;
1904 rec->key_len = old_map_size;
1907 /* build the recovery data into a single blob to allow us to do a single
1908 large write, which should be more efficient */
1909 p = data + sizeof(*rec);
1910 for (el=tdb->transaction->elements;el;el=el->next) {
1911 if (el->offset >= old_map_size) {
1914 if (el->offset + el->length > tdb->transaction->old_map_size) {
1915 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1917 tdb->ecode = TDB_ERR_CORRUPT;
1920 memcpy(p, &el->offset, 4);
1921 memcpy(p+4, &el->length, 4);
1925 /* the recovery area contains the old data, not the
1926 new data, so we have to call the original tdb_read
1928 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1930 tdb->ecode = TDB_ERR_IO;
1933 p += 8 + el->length;
1936 /* and the tailer */
1937 tailer = sizeof(*rec) + recovery_max_size;
1938 memcpy(p, &tailer, 4);
1941 /* write the recovery data to the recovery area */
1942 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1943 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1945 tdb->ecode = TDB_ERR_IO;
1949 /* as we don't have ordered writes, we have to sync the recovery
1950 data before we update the magic to indicate that the recovery
1952 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1959 magic = TDB_RECOVERY_MAGIC;
1962 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1964 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1965 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1966 tdb->ecode = TDB_ERR_IO;
1970 /* ensure the recovery magic marker is on disk */
1971 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1979 commit the current transaction
1981 int tdb_transaction_commit(struct tdb_context *tdb)
1983 const struct tdb_methods *methods;
1984 tdb_off_t magic_offset = 0;
1987 if (tdb->transaction == NULL) {
1988 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1992 if (tdb->transaction->transaction_error) {
1993 tdb->ecode = TDB_ERR_IO;
1994 tdb_transaction_cancel(tdb);
1995 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1999 if (tdb->transaction->nesting != 0) {
2000 tdb->transaction->nesting--;
2004 /* check for a null transaction */
2005 if (tdb->transaction->elements == NULL) {
2006 tdb_transaction_cancel(tdb);
2010 methods = tdb->transaction->io_methods;
2012 /* if there are any locks pending then the caller has not
2013 nested their locks properly, so fail the transaction */
2014 if (tdb->num_locks || tdb->global_lock.count) {
2015 tdb->ecode = TDB_ERR_LOCK;
2016 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2017 tdb_transaction_cancel(tdb);
2021 /* upgrade the main transaction lock region to a write lock */
2022 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2023 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2024 tdb->ecode = TDB_ERR_LOCK;
2025 tdb_transaction_cancel(tdb);
2029 /* get the global lock - this prevents new users attaching to the database
2030 during the commit */
2031 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2032 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2033 tdb->ecode = TDB_ERR_LOCK;
2034 tdb_transaction_cancel(tdb);
2038 if (!(tdb->flags & TDB_NOSYNC)) {
2039 /* write the recovery data to the end of the file */
2040 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2041 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2042 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2043 tdb_transaction_cancel(tdb);
2048 /* expand the file to the new size if needed */
2049 if (tdb->map_size != tdb->transaction->old_map_size) {
2050 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2052 tdb->transaction->old_map_size) == -1) {
2053 tdb->ecode = TDB_ERR_IO;
2054 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2055 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2056 tdb_transaction_cancel(tdb);
2059 tdb->map_size = tdb->transaction->old_map_size;
2060 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2063 /* perform all the writes */
2064 while (tdb->transaction->elements) {
2065 struct tdb_transaction_el *el = tdb->transaction->elements;
2067 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2068 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2070 /* we've overwritten part of the data and
2071 possibly expanded the file, so we need to
2072 run the crash recovery code */
2073 tdb->methods = methods;
2074 tdb_transaction_recover(tdb);
2076 tdb_transaction_cancel(tdb);
2077 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2079 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2082 tdb->transaction->elements = el->next;
2087 if (!(tdb->flags & TDB_NOSYNC)) {
2088 /* ensure the new data is on disk */
2089 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2093 /* remove the recovery marker */
2094 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2095 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2099 /* ensure the recovery marker has been removed on disk */
2100 if (transaction_sync(tdb, magic_offset, 4) == -1) {
2105 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2108 TODO: maybe write to some dummy hdr field, or write to magic
2109 offset without mmap, before the last sync, instead of the
2113 /* on some systems (like Linux 2.6.x) changes via mmap/msync
2114 don't change the mtime of the file, this means the file may
2115 not be backed up (as tdb rounding to block sizes means that
2116 file size changes are quite rare too). The following forces
2117 mtime changes when a transaction completes */
2119 utime(tdb->name, NULL);
2122 /* use a transaction cancel to free memory and remove the
2123 transaction locks */
2124 tdb_transaction_cancel(tdb);
2130 recover from an aborted transaction. Must be called with exclusive
2131 database write access already established (including the global
2132 lock to prevent new processes attaching)
2134 int tdb_transaction_recover(struct tdb_context *tdb)
2136 tdb_off_t recovery_head, recovery_eof;
2137 unsigned char *data, *p;
2139 struct list_struct rec;
2141 /* find the recovery area */
2142 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2143 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2144 tdb->ecode = TDB_ERR_IO;
2148 if (recovery_head == 0) {
2149 /* we have never allocated a recovery record */
2153 /* read the recovery record */
2154 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2155 sizeof(rec), DOCONV()) == -1) {
2156 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2157 tdb->ecode = TDB_ERR_IO;
2161 if (rec.magic != TDB_RECOVERY_MAGIC) {
2162 /* there is no valid recovery data */
2166 if (tdb->read_only) {
2167 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2168 tdb->ecode = TDB_ERR_CORRUPT;
2172 recovery_eof = rec.key_len;
2174 data = (unsigned char *)malloc(rec.data_len);
2176 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2177 tdb->ecode = TDB_ERR_OOM;
2181 /* read the full recovery data */
2182 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2183 rec.data_len, 0) == -1) {
2184 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2185 tdb->ecode = TDB_ERR_IO;
2189 /* recover the file data */
2191 while (p+8 < data + rec.data_len) {
2197 memcpy(&len, p+4, 4);
2199 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2201 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2202 tdb->ecode = TDB_ERR_IO;
2210 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2211 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2212 tdb->ecode = TDB_ERR_IO;
2216 /* if the recovery area is after the recovered eof then remove it */
2217 if (recovery_eof <= recovery_head) {
2218 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2219 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2220 tdb->ecode = TDB_ERR_IO;
2225 /* remove the recovery magic */
2226 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2228 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2229 tdb->ecode = TDB_ERR_IO;
2233 /* reduce the file size to the old size */
2235 if (ftruncate(tdb->fd, recovery_eof) != 0) {
2236 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2237 tdb->ecode = TDB_ERR_IO;
2240 tdb->map_size = recovery_eof;
2243 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2244 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2245 tdb->ecode = TDB_ERR_IO;
2249 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2256 /* file: freelist.c */
2258 /* read a freelist record and check for simple errors */
2259 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2261 if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2264 if (rec->magic == TDB_MAGIC) {
2265 /* this happens when a app is showdown while deleting a record - we should
2266 not completely fail when this happens */
2267 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2269 rec->magic = TDB_FREE_MAGIC;
2270 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2274 if (rec->magic != TDB_FREE_MAGIC) {
2275 /* Ensure ecode is set for log fn. */
2276 tdb->ecode = TDB_ERR_CORRUPT;
2277 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2279 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2281 if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2288 /* Remove an element from the freelist. Must have alloc lock. */
2289 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2291 tdb_off_t last_ptr, i;
2293 /* read in the freelist top */
2294 last_ptr = FREELIST_TOP;
2295 while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2297 /* We've found it! */
2298 return tdb_ofs_write(tdb, last_ptr, &next);
2300 /* Follow chain (next offset is at start of record) */
2303 TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2304 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2308 /* update a record tailer (must hold allocation lock) */
2309 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2310 const struct list_struct *rec)
2312 tdb_off_t totalsize;
2314 /* Offset of tailer from record header */
2315 totalsize = sizeof(*rec) + rec->rec_len;
2316 return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2320 /* Add an element into the freelist. Merge adjacent records if
2322 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2324 tdb_off_t right, left;
2326 /* Allocation and tailer lock */
2327 if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2330 /* set an initial tailer, so if we fail we don't leave a bogus record */
2331 if (update_tailer(tdb, offset, rec) != 0) {
2332 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2336 /* Look right first (I'm an Australian, dammit) */
2337 right = offset + sizeof(*rec) + rec->rec_len;
2338 if (right + sizeof(*rec) <= tdb->map_size) {
2339 struct list_struct r;
2341 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2342 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2346 /* If it's free, expand to include it. */
2347 if (r.magic == TDB_FREE_MAGIC) {
2348 if (remove_from_freelist(tdb, right, r.next) == -1) {
2349 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2352 rec->rec_len += sizeof(r) + r.rec_len;
2358 left = offset - sizeof(tdb_off_t);
2359 if (left > TDB_DATA_START(tdb->header.hash_size)) {
2360 struct list_struct l;
2363 /* Read in tailer and jump back to header */
2364 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2365 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2369 /* it could be uninitialised data */
2370 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2374 left = offset - leftsize;
2376 /* Now read in record */
2377 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2378 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2382 /* If it's free, expand to include it. */
2383 if (l.magic == TDB_FREE_MAGIC) {
2384 if (remove_from_freelist(tdb, left, l.next) == -1) {
2385 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2389 rec->rec_len += leftsize;
2395 if (update_tailer(tdb, offset, rec) == -1) {
2396 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2400 /* Now, prepend to free list */
2401 rec->magic = TDB_FREE_MAGIC;
2403 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2404 tdb_rec_write(tdb, offset, rec) == -1 ||
2405 tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2406 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2410 /* And we're done. */
2411 tdb_unlock(tdb, -1, F_WRLCK);
2415 tdb_unlock(tdb, -1, F_WRLCK);
2421 the core of tdb_allocate - called when we have decided which
2422 free list entry to use
2424 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2425 struct list_struct *rec, tdb_off_t last_ptr)
2427 struct list_struct newrec;
2428 tdb_off_t newrec_ptr;
2430 memset(&newrec, '\0', sizeof(newrec));
2432 /* found it - now possibly split it up */
2433 if (rec->rec_len > length + MIN_REC_SIZE) {
2434 /* Length of left piece */
2435 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2437 /* Right piece to go on free list */
2438 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2439 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2441 /* And left record is shortened */
2442 rec->rec_len = length;
2447 /* Remove allocated record from the free list */
2448 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2452 /* Update header: do this before we drop alloc
2453 lock, otherwise tdb_free() might try to
2454 merge with us, thinking we're free.
2455 (Thanks Jeremy Allison). */
2456 rec->magic = TDB_MAGIC;
2457 if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2461 /* Did we create new block? */
2463 /* Update allocated record tailer (we
2465 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2469 /* Free new record */
2470 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2475 /* all done - return the new record offset */
2479 /* allocate some space from the free list. The offset returned points
2480 to a unconnected list_struct within the database with room for at
2481 least length bytes of total data
2483 0 is returned if the space could not be allocated
2485 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2487 tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2489 tdb_off_t rec_ptr, last_ptr;
2493 if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2496 /* Extra bytes required for tailer */
2497 length += sizeof(tdb_off_t);
2500 last_ptr = FREELIST_TOP;
2502 /* read in the freelist top */
2503 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2506 bestfit.rec_ptr = 0;
2507 bestfit.last_ptr = 0;
2508 bestfit.rec_len = 0;
2511 this is a best fit allocation strategy. Originally we used
2512 a first fit strategy, but it suffered from massive fragmentation
2513 issues when faced with a slowly increasing record size.
2516 if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2520 if (rec->rec_len >= length) {
2521 if (bestfit.rec_ptr == 0 ||
2522 rec->rec_len < bestfit.rec_len) {
2523 bestfit.rec_len = rec->rec_len;
2524 bestfit.rec_ptr = rec_ptr;
2525 bestfit.last_ptr = last_ptr;
2526 /* consider a fit to be good enough if
2527 we aren't wasting more than half
2529 if (bestfit.rec_len < 2*length) {
2535 /* move to the next record */
2537 rec_ptr = rec->next;
2540 if (bestfit.rec_ptr != 0) {
2541 if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2545 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2546 tdb_unlock(tdb, -1, F_WRLCK);
2550 /* we didn't find enough space. See if we can expand the
2551 database and if we can then try again */
2552 if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2555 tdb_unlock(tdb, -1, F_WRLCK);
2559 /* file: freelistcheck.c */
2561 /* Check the freelist is good and contains no loops.
2562 Very memory intensive - only do this as a consistency
2563 checker. Heh heh - uses an in memory tdb as the storage
2564 for the "seen" record list. For some reason this strikes
2565 me as extremely clever as I don't have to write another tree
2566 data structure implementation :-).
2569 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2573 memset(&data, '\0', sizeof(data));
2574 key.dptr = (unsigned char *)&rec_ptr;
2575 key.dsize = sizeof(rec_ptr);
2576 return tdb_store(mem_tdb, key, data, TDB_INSERT);
2579 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2581 struct tdb_context *mem_tdb = NULL;
2582 struct list_struct rec;
2583 tdb_off_t rec_ptr, last_ptr;
2588 mem_tdb = tdb_open("flval", tdb->header.hash_size,
2589 TDB_INTERNAL, O_RDWR, 0600);
2594 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2599 last_ptr = FREELIST_TOP;
2601 /* Store the FREELIST_TOP record. */
2602 if (seen_insert(mem_tdb, last_ptr) == -1) {
2603 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2607 /* read in the freelist top */
2608 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2614 /* If we can't store this record (we've seen it
2615 before) then the free list has a loop and must
2618 if (seen_insert(mem_tdb, rec_ptr)) {
2619 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2623 if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2627 /* move to the next record */
2638 tdb_unlock(tdb, -1, F_WRLCK);
2642 /* file: traverse.c */
2644 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2645 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2646 struct list_struct *rec)
2648 int want_next = (tlock->off != 0);
2650 /* Lock each chain from the start one. */
2651 for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2652 if (!tlock->off && tlock->hash != 0) {
2653 /* this is an optimisation for the common case where
2654 the hash chain is empty, which is particularly
2655 common for the use of tdb with ldb, where large
2656 hashes are used. In that case we spend most of our
2657 time in tdb_brlock(), locking empty hash chains.
2659 To avoid this, we do an unlocked pre-check to see
2660 if the hash chain is empty before starting to look
2661 inside it. If it is empty then we can avoid that
2662 hash chain. If it isn't empty then we can't believe
2663 the value we get back, as we read it without a
2664 lock, so instead we get the lock and re-fetch the
2667 Notice that not doing this optimisation on the
2668 first hash chain is critical. We must guarantee
2669 that we have done at least one fcntl lock at the
2670 start of a search to guarantee that memory is
2671 coherent on SMP systems. If records are added by
2672 others during the search then thats OK, and we
2673 could possibly miss those with this trick, but we
2674 could miss them anyway without this trick, so the
2675 semantics don't change.
2677 With a non-indexed ldb search this trick gains us a
2678 factor of around 80 in speed on a linux 2.6.x
2679 system (testing using ldbtest).
2681 tdb->methods->next_hash_chain(tdb, &tlock->hash);
2682 if (tlock->hash == tdb->header.hash_size) {
2687 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2690 /* No previous record? Start at top of chain. */
2692 if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2696 /* Otherwise unlock the previous record. */
2697 if (tdb_unlock_record(tdb, tlock->off) != 0)
2702 /* We have offset of old record: grab next */
2703 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2705 tlock->off = rec->next;
2708 /* Iterate through chain */
2709 while( tlock->off) {
2711 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2714 /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2715 if (tlock->off == rec->next) {
2716 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2720 if (!TDB_DEAD(rec)) {
2721 /* Woohoo: we found one! */
2722 if (tdb_lock_record(tdb, tlock->off) != 0)
2727 /* Try to clean dead ones from old traverses */
2728 current = tlock->off;
2729 tlock->off = rec->next;
2730 if (!(tdb->read_only || tdb->traverse_read) &&
2731 tdb_do_delete(tdb, current, rec) != 0)
2734 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2737 /* We finished iteration without finding anything */
2738 return TDB_ERRCODE(TDB_SUCCESS, 0);
2742 if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2743 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2747 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2748 return -1 on error or the record count traversed
2749 if fn is NULL then it is not called
2750 a non-zero return value from fn() indicates that the traversal should stop
2752 static int tdb_traverse_internal(struct tdb_context *tdb,
2753 tdb_traverse_func fn, void *private_data,
2754 struct tdb_traverse_lock *tl)
2757 struct list_struct rec;
2760 /* This was in the initializaton, above, but the IRIX compiler
2761 * did not like it. crh
2763 tl->next = tdb->travlocks.next;
2765 /* fcntl locks don't stack: beware traverse inside traverse */
2766 tdb->travlocks.next = tl;
2768 /* tdb_next_lock places locks on the record returned, and its chain */
2769 while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2771 /* now read the full record */
2772 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2773 rec.key_len + rec.data_len);
2776 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2778 if (tdb_unlock_record(tdb, tl->off) != 0)
2779 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2782 key.dsize = rec.key_len;
2783 dbuf.dptr = key.dptr + rec.key_len;
2784 dbuf.dsize = rec.data_len;
2786 /* Drop chain lock, call out */
2787 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2789 SAFE_FREE(key.dptr);
2792 if (fn && fn(tdb, key, dbuf, private_data)) {
2793 /* They want us to terminate traversal */
2795 if (tdb_unlock_record(tdb, tl->off) != 0) {
2796 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2799 SAFE_FREE(key.dptr);
2802 SAFE_FREE(key.dptr);
2805 tdb->travlocks.next = tl->next;
2814 a write style traverse - temporarily marks the db read only
2816 int tdb_traverse_read(struct tdb_context *tdb,
2817 tdb_traverse_func fn, void *private_data)
2819 struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2822 /* we need to get a read lock on the transaction lock here to
2823 cope with the lock ordering semantics of solaris10 */
2824 if (tdb_transaction_lock(tdb, F_RDLCK)) {
2828 tdb->traverse_read++;
2829 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2830 tdb->traverse_read--;
2832 tdb_transaction_unlock(tdb);
2838 a write style traverse - needs to get the transaction lock to
2841 int tdb_traverse(struct tdb_context *tdb,
2842 tdb_traverse_func fn, void *private_data)
2844 struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2847 if (tdb->read_only || tdb->traverse_read) {
2848 return tdb_traverse_read(tdb, fn, private_data);
2851 if (tdb_transaction_lock(tdb, F_WRLCK)) {
2855 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2857 tdb_transaction_unlock(tdb);
2863 /* find the first entry in the database and return its key */
2864 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2867 struct list_struct rec;
2869 /* release any old lock */
2870 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2872 tdb->travlocks.off = tdb->travlocks.hash = 0;
2873 tdb->travlocks.lock_rw = F_RDLCK;
2875 /* Grab first record: locks chain and returned record. */
2876 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2878 /* now read the key */
2879 key.dsize = rec.key_len;
2880 key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2882 /* Unlock the hash chain of the record we just read. */
2883 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2884 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2888 /* find the next entry in the database, returning its key */
2889 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2892 TDB_DATA key = tdb_null;
2893 struct list_struct rec;
2894 unsigned char *k = NULL;
2896 /* Is locked key the old key? If so, traverse will be reliable. */
2897 if (tdb->travlocks.off) {
2898 if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2900 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2901 || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2903 || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2904 /* No, it wasn't: unlock it and start from scratch */
2905 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2909 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2913 tdb->travlocks.off = 0;
2919 if (!tdb->travlocks.off) {
2920 /* No previous element: do normal find, and lock record */
2921 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2922 if (!tdb->travlocks.off)
2924 tdb->travlocks.hash = BUCKET(rec.full_hash);
2925 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2926 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2930 oldhash = tdb->travlocks.hash;
2932 /* Grab next record: locks chain and returned record,
2933 unlocks old record */
2934 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2935 key.dsize = rec.key_len;
2936 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2938 /* Unlock the chain of this new record */
2939 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2940 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2942 /* Unlock the chain of old record */
2943 if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2944 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2950 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2953 struct list_struct rec;
2954 tdb_off_t tailer_ofs, tailer;
2956 if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2957 sizeof(rec), DOCONV()) == -1) {
2958 printf("ERROR: failed to read record at %u\n", offset);
2962 printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2963 "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2964 hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2965 rec.full_hash, rec.magic);
2967 tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2969 if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2970 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2974 if (tailer != rec.rec_len + sizeof(rec)) {
2975 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2976 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2981 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2983 tdb_off_t rec_ptr, top;
2985 top = TDB_HASH_TOP(i);
2987 if (tdb_lock(tdb, i, F_WRLCK) != 0)
2990 if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2991 return tdb_unlock(tdb, i, F_WRLCK);
2994 printf("hash=%d\n", i);
2997 rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
3000 return tdb_unlock(tdb, i, F_WRLCK);
3003 void tdb_dump_all(struct tdb_context *tdb)
3006 for (i=0;i<tdb->header.hash_size;i++) {
3007 tdb_dump_chain(tdb, i);
3009 printf("freelist:\n");
3010 tdb_dump_chain(tdb, -1);
3013 int tdb_printfreelist(struct tdb_context *tdb)
3016 long total_free = 0;
3017 tdb_off_t offset, rec_ptr;
3018 struct list_struct rec;
3020 if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3023 offset = FREELIST_TOP;
3025 /* read in the freelist top */
3026 if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3027 tdb_unlock(tdb, -1, F_WRLCK);
3031 printf("freelist top=[0x%08x]\n", rec_ptr );
3033 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3034 sizeof(rec), DOCONV()) == -1) {
3035 tdb_unlock(tdb, -1, F_WRLCK);
3039 if (rec.magic != TDB_FREE_MAGIC) {
3040 printf("bad magic 0x%08x in free list\n", rec.magic);
3041 tdb_unlock(tdb, -1, F_WRLCK);
3045 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3046 rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3047 total_free += rec.rec_len;
3049 /* move to the next record */
3052 printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3055 return tdb_unlock(tdb, -1, F_WRLCK);
3063 non-blocking increment of the tdb sequence number if the tdb has been opened using
3066 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3070 if (!(tdb->flags & TDB_SEQNUM)) {
3074 /* we ignore errors from this, as we have no sane way of
3077 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3079 tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3083 increment the tdb sequence number if the tdb has been opened using
3086 static void tdb_increment_seqnum(struct tdb_context *tdb)
3088 if (!(tdb->flags & TDB_SEQNUM)) {
3092 if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3096 tdb_increment_seqnum_nonblock(tdb);
3098 tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3101 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3103 return memcmp(data.dptr, key.dptr, data.dsize);
3106 /* Returns 0 on fail. On success, return offset of record, and fills
3108 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3109 struct list_struct *r)
3113 /* read in the hash top */
3114 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3117 /* keep looking until we find the right record */
3119 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3122 if (!TDB_DEAD(r) && hash==r->full_hash
3123 && key.dsize==r->key_len
3124 && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3125 r->key_len, tdb_key_compare,
3131 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3134 /* As tdb_find, but if you succeed, keep the lock */
3135 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3136 struct list_struct *rec)
3140 if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3142 if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3143 tdb_unlock(tdb, BUCKET(hash), locktype);
3148 /* update an entry in place - this only works if the new data size
3149 is <= the old data size and the key exists.
3150 on failure return -1.
3152 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3154 struct list_struct rec;
3158 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3161 /* must be long enough key, data and tailer */
3162 if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3163 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3167 if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3168 dbuf.dptr, dbuf.dsize) == -1)
3171 if (dbuf.dsize != rec.data_len) {
3173 rec.data_len = dbuf.dsize;
3174 return tdb_rec_write(tdb, rec_ptr, &rec);
3180 /* find an entry in the database given a key */
3181 /* If an entry doesn't exist tdb_err will be set to
3182 * TDB_ERR_NOEXIST. If a key has no data attached
3183 * then the TDB_DATA will have zero length but
3184 * a non-zero pointer
3186 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3189 struct list_struct rec;
3193 /* find which hash bucket it is in */
3194 hash = tdb->hash_fn(&key);
3195 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3198 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3200 ret.dsize = rec.data_len;
3201 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3206 * Find an entry in the database and hand the record's data to a parsing
3207 * function. The parsing function is executed under the chain read lock, so it
3208 * should be fast and should not block on other syscalls.
3210 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3212 * For mmapped tdb's that do not have a transaction open it points the parsing
3213 * function directly at the mmap area, it avoids the malloc/memcpy in this
3214 * case. If a transaction is open or no mmap is available, it has to do
3215 * malloc/read/parse/free.
3217 * This is interesting for all readers of potentially large data structures in
3218 * the tdb records, ldb indexes being one example.
3221 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3222 int (*parser)(TDB_DATA key, TDB_DATA data,
3223 void *private_data),
3227 struct list_struct rec;
3231 /* find which hash bucket it is in */
3232 hash = tdb->hash_fn(&key);
3234 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3235 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3238 ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3239 rec.data_len, parser, private_data);
3241 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3246 /* check if an entry in the database exists
3248 note that 1 is returned if the key is found and 0 is returned if not found
3249 this doesn't match the conventions in the rest of this module, but is
3250 compatible with gdbm
3252 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3254 struct list_struct rec;
3256 if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3258 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3262 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3264 u32 hash = tdb->hash_fn(&key);
3265 return tdb_exists_hash(tdb, key, hash);
3268 /* actually delete an entry in the database given the offset */
3269 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3271 tdb_off_t last_ptr, i;
3272 struct list_struct lastrec;
3274 if (tdb->read_only || tdb->traverse_read) return -1;
3276 if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3277 /* Someone traversing here: mark it as dead */
3278 rec->magic = TDB_DEAD_MAGIC;
3279 return tdb_rec_write(tdb, rec_ptr, rec);
3281 if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3284 /* find previous record in hash chain */
3285 if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3287 for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3288 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3291 /* unlink it: next ptr is at start of record. */
3293 last_ptr = TDB_HASH_TOP(rec->full_hash);
3294 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3297 /* recover the space */
3298 if (tdb_free(tdb, rec_ptr, rec) == -1)
3303 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3307 struct list_struct rec;
3309 /* read in the hash top */
3310 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3314 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3317 if (rec.magic == TDB_DEAD_MAGIC) {
3326 * Purge all DEAD records from a hash chain
3328 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3331 struct list_struct rec;
3334 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3338 /* read in the hash top */
3339 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3345 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3351 if (rec.magic == TDB_DEAD_MAGIC
3352 && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3359 tdb_unlock(tdb, -1, F_WRLCK);
3363 /* delete an entry in the database given a key */
3364 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3367 struct list_struct rec;
3370 if (tdb->max_dead_records != 0) {
3373 * Allow for some dead records per hash chain, mainly for
3374 * tdb's with a very high create/delete rate like locking.tdb.
3377 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3380 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3382 * Don't let the per-chain freelist grow too large,
3383 * delete all existing dead records
3385 tdb_purge_dead(tdb, hash);
3388 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3389 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3394 * Just mark the record as dead.
3396 rec.magic = TDB_DEAD_MAGIC;
3397 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3400 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3404 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3408 tdb_increment_seqnum(tdb);
3411 if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3412 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3416 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3418 u32 hash = tdb->hash_fn(&key);
3419 return tdb_delete_hash(tdb, key, hash);
3423 * See if we have a dead record around with enough space
3425 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3426 struct list_struct *r, tdb_len_t length)
3430 /* read in the hash top */
3431 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3434 /* keep looking until we find the right record */
3436 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3439 if (TDB_DEAD(r) && r->rec_len >= length) {
3441 * First fit for simple coding, TODO: change to best
3451 /* store an element in the database, replacing any existing element
3454 return 0 on success, -1 on failure
3456 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3458 struct list_struct rec;
3464 if (tdb->read_only || tdb->traverse_read) {
3465 tdb->ecode = TDB_ERR_RDONLY;
3469 /* find which hash bucket it is in */
3470 hash = tdb->hash_fn(&key);
3471 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3474 /* check for it existing, on insert. */
3475 if (flag == TDB_INSERT) {
3476 if (tdb_exists_hash(tdb, key, hash)) {
3477 tdb->ecode = TDB_ERR_EXISTS;
3481 /* first try in-place update, on modify or replace. */
3482 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3485 if (tdb->ecode == TDB_ERR_NOEXIST &&
3486 flag == TDB_MODIFY) {
3487 /* if the record doesn't exist and we are in TDB_MODIFY mode then
3488 we should fail the store */
3492 /* reset the error code potentially set by the tdb_update() */
3493 tdb->ecode = TDB_SUCCESS;
3495 /* delete any existing record - if it doesn't exist we don't
3496 care. Doing this first reduces fragmentation, and avoids
3497 coalescing with `allocated' block before it's updated. */
3498 if (flag != TDB_INSERT)
3499 tdb_delete_hash(tdb, key, hash);
3501 /* Copy key+value *before* allocating free space in case malloc
3502 fails and we are left with a dead spot in the tdb. */
3504 if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3505 tdb->ecode = TDB_ERR_OOM;
3509 memcpy(p, key.dptr, key.dsize);
3511 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3513 if (tdb->max_dead_records != 0) {
3515 * Allow for some dead records per hash chain, look if we can
3516 * find one that can hold the new record. We need enough space
3517 * for key, data and tailer. If we find one, we don't have to
3518 * consult the central freelist.
3520 rec_ptr = tdb_find_dead(
3522 key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3525 rec.key_len = key.dsize;
3526 rec.data_len = dbuf.dsize;
3527 rec.full_hash = hash;
3528 rec.magic = TDB_MAGIC;
3529 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3530 || tdb->methods->tdb_write(
3531 tdb, rec_ptr + sizeof(rec),
3532 p, key.dsize + dbuf.dsize) == -1) {
3540 * We have to allocate some space from the freelist, so this means we
3541 * have to lock it. Use the chance to purge all the DEAD records from
3542 * the hash chain under the freelist lock.
3545 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3549 if ((tdb->max_dead_records != 0)
3550 && (tdb_purge_dead(tdb, hash) == -1)) {
3551 tdb_unlock(tdb, -1, F_WRLCK);
3555 /* we have to allocate some space */
3556 rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3558 tdb_unlock(tdb, -1, F_WRLCK);
3564 /* Read hash top into next ptr */
3565 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3568 rec.key_len = key.dsize;
3569 rec.data_len = dbuf.dsize;
3570 rec.full_hash = hash;
3571 rec.magic = TDB_MAGIC;
3573 /* write out and point the top of the hash chain at it */
3574 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3575 || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3576 || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3577 /* Need to tdb_unallocate() here */
3585 tdb_increment_seqnum(tdb);
3589 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3594 /* Append to an entry. Create if not exist. */
3595 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3601 /* find which hash bucket it is in */
3602 hash = tdb->hash_fn(&key);
3603 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3606 dbuf = tdb_fetch(tdb, key);
3608 if (dbuf.dptr == NULL) {
3609 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3611 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3612 dbuf.dsize + new_dbuf.dsize);
3613 if (new_dptr == NULL) {
3616 dbuf.dptr = new_dptr;
3619 if (dbuf.dptr == NULL) {
3620 tdb->ecode = TDB_ERR_OOM;
3624 memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3625 dbuf.dsize += new_dbuf.dsize;
3627 ret = tdb_store(tdb, key, dbuf, 0);
3630 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3631 SAFE_FREE(dbuf.dptr);
3637 return the name of the current tdb file
3638 useful for external logging functions
3640 const char *tdb_name(struct tdb_context *tdb)
3646 return the underlying file descriptor being used by tdb, or -1
3647 useful for external routines that want to check the device/inode
3650 int tdb_fd(struct tdb_context *tdb)
3656 return the current logging function
3657 useful for external tdb routines that wish to log tdb errors
3659 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3661 return tdb->log.log_fn;
3666 get the tdb sequence number. Only makes sense if the writers opened
3667 with TDB_SEQNUM set. Note that this sequence number will wrap quite
3668 quickly, so it should only be used for a 'has something changed'
3669 test, not for code that relies on the count of the number of changes
3670 made. If you want a counter then use a tdb record.
3672 The aim of this sequence number is to allow for a very lightweight
3673 test of a possible tdb change.
3675 int tdb_get_seqnum(struct tdb_context *tdb)
3679 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3683 int tdb_hash_size(struct tdb_context *tdb)
3685 return tdb->header.hash_size;
3688 size_t tdb_map_size(struct tdb_context *tdb)
3690 return tdb->map_size;
3693 int tdb_get_flags(struct tdb_context *tdb)
3700 enable sequence number handling on an open tdb
3702 void tdb_enable_seqnum(struct tdb_context *tdb)
3704 tdb->flags |= TDB_SEQNUM;
3709 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3710 static struct tdb_context *tdbs = NULL;
3713 /* This is based on the hash algorithm from gdbm */
3714 static unsigned int default_tdb_hash(TDB_DATA *key)
3716 u32 value; /* Used to compute the hash value. */
3717 u32 i; /* Used to cycle through random values. */
3719 /* Set the initial value from the key size. */
3720 for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
3721 value = (value + (key->dptr[i] << (i*5 % 24)));
3723 return (1103515243 * value + 12345);
3727 /* initialise a new database with a specified hash size */
3728 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3730 struct tdb_header *newdb;
3733 /* We make it up in memory, then write it out if not internal */
3734 size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3735 if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3736 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3738 /* Fill in the header */
3739 newdb->version = TDB_VERSION;
3740 newdb->hash_size = hash_size;
3741 if (tdb->flags & TDB_INTERNAL) {
3742 tdb->map_size = size;
3743 tdb->map_ptr = (char *)newdb;
3744 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3745 /* Convert the `ondisk' version if asked. */
3749 if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3752 if (ftruncate(tdb->fd, 0) == -1)
3755 /* This creates an endian-converted header, as if read from disk */
3757 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3758 /* Don't endian-convert the magic food! */
3759 memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3760 if (write(tdb->fd, newdb, size) != size) {
3773 static int tdb_already_open(dev_t device,
3776 struct tdb_context *i;
3778 for (i = tdbs; i; i = i->next) {
3779 if (i->device == device && i->inode == ino) {
3787 /* open the database, creating it if necessary
3789 The open_flags and mode are passed straight to the open call on the
3790 database file. A flags value of O_WRONLY is invalid. The hash size
3791 is advisory, use zero for a default value.
3793 Return is NULL on error, in which case errno is also set. Don't
3794 try to call tdb_error or tdb_errname, just do strerror(errno).
3796 @param name may be NULL for internal databases. */
3797 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3798 int open_flags, mode_t mode)
3800 return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3803 /* a default logging function */
3804 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3805 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3810 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3811 int open_flags, mode_t mode,
3812 const struct tdb_logging_context *log_ctx,
3813 tdb_hash_func hash_fn)
3815 struct tdb_context *tdb;
3817 int rev = 0, locked = 0;
3821 if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3822 /* Can't log this */
3829 tdb->map_ptr = NULL;
3830 tdb->flags = tdb_flags;
3831 tdb->open_flags = open_flags;
3833 tdb->log = *log_ctx;
3835 tdb->log.log_fn = null_log_fn;
3836 tdb->log.log_private = NULL;
3838 tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3840 /* cache the page size */
3841 tdb->page_size = getpagesize();
3842 if (tdb->page_size <= 0) {
3843 tdb->page_size = 0x2000;
3846 if ((open_flags & O_ACCMODE) == O_WRONLY) {
3847 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3854 hash_size = DEFAULT_HASH_SIZE;
3855 if ((open_flags & O_ACCMODE) == O_RDONLY) {
3857 /* read only databases don't do locking or clear if first */
3858 tdb->flags |= TDB_NOLOCK;
3859 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3862 /* internal databases don't mmap or lock, and start off cleared */
3863 if (tdb->flags & TDB_INTERNAL) {
3864 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3865 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3866 if (tdb_new_database(tdb, hash_size) != 0) {
3867 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3873 if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3874 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3875 name, strerror(errno)));
3876 goto fail; /* errno set by open(2) */
3879 /* ensure there is only one process initialising at once */
3880 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3881 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3882 name, strerror(errno)));
3883 goto fail; /* errno set by tdb_brlock */
3886 /* we need to zero database if we are the only one with it open */
3887 if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3888 (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3889 open_flags |= O_CREAT;
3890 if (ftruncate(tdb->fd, 0) == -1) {
3891 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3892 "failed to truncate %s: %s\n",
3893 name, strerror(errno)));
3894 goto fail; /* errno set by ftruncate */
3898 if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3899 || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3900 || (tdb->header.version != TDB_VERSION
3901 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3902 /* its not a valid database - possibly initialise it */
3903 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3904 errno = EIO; /* ie bad format or something */
3907 rev = (tdb->flags & TDB_CONVERT);
3909 vp = (unsigned char *)&tdb->header.version;
3910 vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3911 (((u32)vp[2]) << 8) | (u32)vp[3];
3912 tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3914 tdb->flags &= ~TDB_CONVERT;
3916 tdb->flags |= TDB_CONVERT;
3917 tdb_convert(&tdb->header, sizeof(tdb->header));
3919 if (fstat(tdb->fd, &st) == -1)
3922 if (tdb->header.rwlocks != 0) {
3923 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3927 /* Is it already in the open list? If so, fail. */
3928 if (tdb_already_open(st.st_dev, st.st_ino)) {
3929 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3930 "%s (%d,%d) is already open in this process\n",
3931 name, (int)st.st_dev, (int)st.st_ino));
3936 if (!(tdb->name = (char *)strdup(name))) {
3941 tdb->map_size = st.st_size;
3942 tdb->device = st.st_dev;
3943 tdb->inode = st.st_ino;
3944 tdb->max_dead_records = 0;
3947 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3948 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3949 "failed to take ACTIVE_LOCK on %s: %s\n",
3950 name, strerror(errno)));
3956 /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3957 we didn't get the initial exclusive lock as we need to let all other
3958 users know we're using it. */
3960 if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3961 /* leave this lock in place to indicate it's in use */
3962 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3966 /* if needed, run recovery */
3967 if (tdb_transaction_recover(tdb) == -1) {
3972 /* Internal (memory-only) databases skip all the code above to
3973 * do with disk files, and resume here by releasing their
3974 * global lock and hooking into the active list. */
3975 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3982 { int save_errno = errno;
3988 if (tdb->flags & TDB_INTERNAL)
3989 SAFE_FREE(tdb->map_ptr);
3993 SAFE_FREE(tdb->name);
3995 if (close(tdb->fd) != 0)
3996 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
4004 * Set the maximum number of dead records per hash chain
4007 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4009 tdb->max_dead_records = max_dead;
4015 * @returns -1 for error; 0 for success.
4017 int tdb_close(struct tdb_context *tdb)
4019 struct tdb_context **i;
4022 if (tdb->transaction) {
4023 tdb_transaction_cancel(tdb);
4027 if (tdb->flags & TDB_INTERNAL)
4028 SAFE_FREE(tdb->map_ptr);
4032 SAFE_FREE(tdb->name);
4034 ret = close(tdb->fd);
4035 SAFE_FREE(tdb->lockrecs);
4037 /* Remove from contexts list */
4038 for (i = &tdbs; *i; i = &(*i)->next) {
4045 memset(tdb, 0, sizeof(*tdb));
4051 /* register a loging function */
4052 void tdb_set_logging_function(struct tdb_context *tdb,
4053 const struct tdb_logging_context *log_ctx)
4055 tdb->log = *log_ctx;
4058 void *tdb_get_logging_private(struct tdb_context *tdb)
4060 return tdb->log.log_private;
4063 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
4064 seek pointer from our parent and to re-establish locks */
4065 int tdb_reopen(struct tdb_context *tdb)
4069 if (tdb->flags & TDB_INTERNAL) {
4070 return 0; /* Nothing to do. */
4073 if (tdb->num_locks != 0 || tdb->global_lock.count) {
4074 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4078 if (tdb->transaction != 0) {
4079 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4083 if (tdb_munmap(tdb) != 0) {
4084 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4087 if (close(tdb->fd) != 0)
4088 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4089 tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4090 if (tdb->fd == -1) {
4091 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4094 if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4095 (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4096 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4099 if (fstat(tdb->fd, &st) != 0) {
4100 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4103 if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4104 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4116 /* reopen all tdb's */
4117 int tdb_reopen_all(int parent_longlived)
4119 struct tdb_context *tdb;
4121 for (tdb=tdbs; tdb; tdb = tdb->next) {
4123 * If the parent is longlived (ie. a
4124 * parent daemon architecture), we know
4125 * it will keep it's active lock on a
4126 * tdb opened with CLEAR_IF_FIRST. Thus
4127 * for child processes we don't have to
4128 * add an active lock. This is essential
4129 * to improve performance on systems that
4130 * keep POSIX locks as a non-scalable data
4131 * structure in the kernel.
4133 if (parent_longlived) {
4134 /* Ensure no clear-if-first. */
4135 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4138 if (tdb_reopen(tdb) != 0)