2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb
4 Last Changed: 2007-04-03 05:08:18 -0400
7 trivial database library - standalone version
9 Copyright (C) Andrew Tridgell 1999-2005
10 Copyright (C) Jeremy Allison 2000-2006
11 Copyright (C) Paul `Rusty' Russell 2000
13 ** NOTE! The following LGPL license applies to the tdb
14 ** library. This does NOT imply that all of Samba is released
17 This library is free software; you can redistribute it and/or
18 modify it under the terms of the GNU Lesser General Public
19 License as published by the Free Software Foundation; either
20 version 2 of the License, or (at your option) any later version.
22 This library is distributed in the hope that it will be useful,
23 but WITHOUT ANY WARRANTY; without even the implied warranty of
24 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
25 Lesser General Public License for more details.
27 You should have received a copy of the GNU Lesser General Public
28 License along with this library; if not, write to the Free Software
29 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
32 #ifdef CONFIG_STAND_ALONE
35 #define HAVE_SYS_MMAN_H
39 #define _XOPEN_SOURCE 500
48 #ifdef HAVE_SYS_SELECT_H
49 #include <sys/select.h>
52 #include <sys/types.h>
61 #ifdef HAVE_SYS_MMAN_H
70 #define MAP_FAILED ((void *)-1)
74 #define strdup rep_strdup
75 static char *rep_strdup(const char *s)
85 ret = malloc(length + 1);
87 strncpy(ret, s, length);
94 #ifndef PRINTF_ATTRIBUTE
95 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
96 /** Use gcc attribute to check printf fns. a1 is the 1-based index of
97 * the parameter containing the format, and a2 the index of the first
98 * argument. Note that some gcc 2.x versions don't handle this
100 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
102 #define PRINTF_ATTRIBUTE(a1, a2)
112 #ifndef HAVE_GETPAGESIZE
113 #define getpagesize() 0x2000
116 typedef u32 tdb_len_t;
117 typedef u32 tdb_off_t;
120 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
123 #define TDB_MAGIC_FOOD "TDB file\n"
124 #define TDB_VERSION (0x26011967 + 6)
125 #define TDB_MAGIC (0x26011999U)
126 #define TDB_FREE_MAGIC (~TDB_MAGIC)
127 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
128 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
129 #define TDB_ALIGNMENT 4
130 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
131 #define DEFAULT_HASH_SIZE 131
132 #define FREELIST_TOP (sizeof(struct tdb_header))
133 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
134 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
135 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
136 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
137 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
138 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
139 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
140 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
141 #define TDB_SEQNUM_OFS offsetof(struct tdb_header, sequence_number)
142 #define TDB_PAD_BYTE 0x42
143 #define TDB_PAD_U32 0x42424242
145 /* NB assumes there is a local variable called "tdb" that is the
146 * current context, also takes doubly-parenthesized print-style
148 #define TDB_LOG(x) tdb->log.log_fn x
151 #define GLOBAL_LOCK 0
152 #define ACTIVE_LOCK 4
153 #define TRANSACTION_LOCK 8
155 /* free memory if the pointer is valid and zero the pointer */
157 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
160 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
162 #define DOCONV() (tdb->flags & TDB_CONVERT)
163 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
166 /* the body of the database is made of one list_struct for the free space
167 plus a separate data list for each hash value */
169 tdb_off_t next; /* offset of the next record in the list */
170 tdb_len_t rec_len; /* total byte length of record */
171 tdb_len_t key_len; /* byte length of key */
172 tdb_len_t data_len; /* byte length of data */
173 u32 full_hash; /* the full 32 bit hash of the key */
174 u32 magic; /* try to catch errors */
175 /* the following union is implied:
177 char record[rec_len];
182 u32 totalsize; (tailer)
188 /* this is stored at the front of every database */
190 char magic_food[32]; /* for /etc/magic */
191 u32 version; /* version of the code */
192 u32 hash_size; /* number of hash entries */
193 tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
194 tdb_off_t recovery_start; /* offset of transaction recovery region */
195 tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
196 tdb_off_t reserved[29];
199 struct tdb_lock_type {
205 struct tdb_traverse_lock {
206 struct tdb_traverse_lock *next;
214 int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
215 int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
216 void (*next_hash_chain)(struct tdb_context *, u32 *);
217 int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
218 int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
219 int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
223 char *name; /* the name of the database */
224 void *map_ptr; /* where it is currently mapped */
225 int fd; /* open file descriptor for the database */
226 tdb_len_t map_size; /* how much space has been mapped */
227 int read_only; /* opened read-only */
228 int traverse_read; /* read-only traversal */
229 struct tdb_lock_type global_lock;
231 struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
232 enum TDB_ERROR ecode; /* error code for last tdb error */
233 struct tdb_header header; /* a cached copy of the header */
234 u32 flags; /* the flags passed to tdb_open */
235 struct tdb_traverse_lock travlocks; /* current traversal locks */
236 struct tdb_context *next; /* all tdbs to avoid multiple opens */
237 dev_t device; /* uniquely identifies this tdb */
238 ino_t inode; /* uniquely identifies this tdb */
239 struct tdb_logging_context log;
240 unsigned int (*hash_fn)(TDB_DATA *key);
241 int open_flags; /* flags used in the open - needed by reopen */
242 unsigned int num_locks; /* number of chain locks held */
243 const struct tdb_methods *methods;
244 struct tdb_transaction *transaction;
246 int max_dead_records;
253 static int tdb_munmap(struct tdb_context *tdb);
254 static void tdb_mmap(struct tdb_context *tdb);
255 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
256 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
257 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
258 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
259 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
260 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
261 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
262 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
263 static void *tdb_convert(void *buf, u32 size);
264 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
265 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
266 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
268 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
269 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
270 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
271 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
272 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
273 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
274 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
275 tdb_off_t offset, tdb_len_t len,
276 int (*parser)(TDB_DATA key, TDB_DATA data,
279 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
280 struct list_struct *rec);
281 static void tdb_io_init(struct tdb_context *tdb);
282 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
287 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
292 static struct tdb_errname {
293 enum TDB_ERROR ecode; const char *estring;
294 } emap[] = { {TDB_SUCCESS, "Success"},
295 {TDB_ERR_CORRUPT, "Corrupt database"},
296 {TDB_ERR_IO, "IO Error"},
297 {TDB_ERR_LOCK, "Locking error"},
298 {TDB_ERR_OOM, "Out of memory"},
299 {TDB_ERR_EXISTS, "Record exists"},
300 {TDB_ERR_NOLOCK, "Lock exists on other keys"},
301 {TDB_ERR_EINVAL, "Invalid parameter"},
302 {TDB_ERR_NOEXIST, "Record does not exist"},
303 {TDB_ERR_RDONLY, "write not permitted"} };
305 /* Error string for the last tdb error */
306 const char *tdb_errorstr(struct tdb_context *tdb)
309 for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
310 if (tdb->ecode == emap[i].ecode)
311 return emap[i].estring;
312 return "Invalid error code";
317 /* a byte range locking function - return 0 on success
318 this functions locks/unlocks 1 byte at the specified offset.
320 On error, errno is also set so that errors are passed back properly
323 note that a len of zero means lock to end of file
325 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
326 int rw_type, int lck_type, int probe, size_t len)
331 if (tdb->flags & TDB_NOLOCK) {
335 if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
336 tdb->ecode = TDB_ERR_RDONLY;
341 fl.l_whence = SEEK_SET;
347 ret = fcntl(tdb->fd,lck_type,&fl);
348 } while (ret == -1 && errno == EINTR);
351 /* Generic lock error. errno set by fcntl.
352 * EAGAIN is an expected return from non-blocking
354 if (!probe && lck_type != F_SETLK) {
355 /* Ensure error code is set for log fun to examine. */
356 tdb->ecode = TDB_ERR_LOCK;
357 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
358 tdb->fd, offset, rw_type, lck_type, (int)len));
360 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
367 upgrade a read lock to a write lock. This needs to be handled in a
368 special way as some OSes (such as solaris) have too conservative
369 deadlock detection and claim a deadlock when progress can be
370 made. For those OSes we may loop for a while.
372 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
377 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
380 if (errno != EDEADLK) {
383 /* sleep for as short a time as we can - more portable than usleep() */
386 select(0, NULL, NULL, NULL, &tv);
388 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
393 /* lock a list in the database. list -1 is the alloc list */
394 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
396 struct tdb_lock_type *new_lck;
399 /* a global lock allows us to avoid per chain locks */
400 if (tdb->global_lock.count &&
401 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
405 if (tdb->global_lock.count) {
406 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
409 if (list < -1 || list >= (int)tdb->header.hash_size) {
410 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
414 if (tdb->flags & TDB_NOLOCK)
417 for (i=0; i<tdb->num_lockrecs; i++) {
418 if (tdb->lockrecs[i].list == list) {
419 if (tdb->lockrecs[i].count == 0) {
421 * Can't happen, see tdb_unlock(). It should
424 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
425 "lck->count == 0 for list %d", list));
428 * Just increment the in-memory struct, posix locks
431 tdb->lockrecs[i].count++;
436 new_lck = (struct tdb_lock_type *)realloc(
438 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
439 if (new_lck == NULL) {
443 tdb->lockrecs = new_lck;
445 /* Since fcntl locks don't nest, we do a lock for the first one,
446 and simply bump the count for future ones */
447 if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW,
449 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
450 "ltype=%d (%s)\n", list, ltype, strerror(errno)));
456 tdb->lockrecs[tdb->num_lockrecs].list = list;
457 tdb->lockrecs[tdb->num_lockrecs].count = 1;
458 tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
459 tdb->num_lockrecs += 1;
464 /* unlock the database: returns void because it's too late for errors. */
465 /* changed to return int it may be interesting to know there
466 has been an error --simo */
467 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
471 struct tdb_lock_type *lck = NULL;
473 /* a global lock allows us to avoid per chain locks */
474 if (tdb->global_lock.count &&
475 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
479 if (tdb->global_lock.count) {
480 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
483 if (tdb->flags & TDB_NOLOCK)
487 if (list < -1 || list >= (int)tdb->header.hash_size) {
488 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
492 for (i=0; i<tdb->num_lockrecs; i++) {
493 if (tdb->lockrecs[i].list == list) {
494 lck = &tdb->lockrecs[i];
499 if ((lck == NULL) || (lck->count == 0)) {
500 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
504 if (lck->count > 1) {
510 * This lock has count==1 left, so we need to unlock it in the
511 * kernel. We don't bother with decrementing the in-memory array
512 * element, we're about to overwrite it with the last array element
516 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
521 * Shrink the array by overwriting the element just unlocked with the
522 * last array element.
525 if (tdb->num_lockrecs > 1) {
526 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
528 tdb->num_lockrecs -= 1;
531 * We don't bother with realloc when the array shrinks, but if we have
532 * a completely idle tdb we should get rid of the locked array.
535 if (tdb->num_lockrecs == 0) {
536 SAFE_FREE(tdb->lockrecs);
540 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
546 /* lock/unlock entire database */
547 static int _tdb_lockall(struct tdb_context *tdb, int ltype)
549 /* There are no locks on read-only dbs */
550 if (tdb->read_only || tdb->traverse_read)
551 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
553 if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
554 tdb->global_lock.count++;
558 if (tdb->global_lock.count) {
559 /* a global lock of a different type exists */
560 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
563 if (tdb->num_locks != 0) {
564 /* can't combine global and chain locks */
565 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
568 if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, F_SETLKW,
569 0, 4*tdb->header.hash_size)) {
570 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
574 tdb->global_lock.count = 1;
575 tdb->global_lock.ltype = ltype;
580 /* unlock entire db */
581 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
583 /* There are no locks on read-only dbs */
584 if (tdb->read_only || tdb->traverse_read) {
585 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
588 if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
589 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
592 if (tdb->global_lock.count > 1) {
593 tdb->global_lock.count--;
597 if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
598 0, 4*tdb->header.hash_size)) {
599 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
603 tdb->global_lock.count = 0;
604 tdb->global_lock.ltype = 0;
609 /* lock entire database with write lock */
610 int tdb_lockall(struct tdb_context *tdb)
612 return _tdb_lockall(tdb, F_WRLCK);
615 /* unlock entire database with write lock */
616 int tdb_unlockall(struct tdb_context *tdb)
618 return _tdb_unlockall(tdb, F_WRLCK);
621 /* lock entire database with read lock */
622 int tdb_lockall_read(struct tdb_context *tdb)
624 return _tdb_lockall(tdb, F_RDLCK);
627 /* unlock entire database with read lock */
628 int tdb_unlockall_read(struct tdb_context *tdb)
630 return _tdb_unlockall(tdb, F_RDLCK);
633 /* lock/unlock one hash chain. This is meant to be used to reduce
634 contention - it cannot guarantee how many records will be locked */
635 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
637 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
640 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
642 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
645 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
647 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
650 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
652 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
657 /* record lock stops delete underneath */
658 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
660 return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
664 Write locks override our own fcntl readlocks, so check it here.
665 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
666 an error to fail to get the lock here.
668 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
670 struct tdb_traverse_lock *i;
671 for (i = &tdb->travlocks; i; i = i->next)
674 return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
678 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
679 an error to fail to get the lock here.
681 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
683 return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
686 /* fcntl locks don't stack: avoid unlocking someone else's */
687 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
689 struct tdb_traverse_lock *i;
694 for (i = &tdb->travlocks; i; i = i->next)
697 return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
702 /* check for an out of bounds access - if it is out of bounds then
703 see if the database has been expanded by someone else and expand
705 note that "len" is the minimum length needed for the db
707 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
710 if (len <= tdb->map_size)
712 if (tdb->flags & TDB_INTERNAL) {
714 /* Ensure ecode is set for log fn. */
715 tdb->ecode = TDB_ERR_IO;
716 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
717 (int)len, (int)tdb->map_size));
719 return TDB_ERRCODE(TDB_ERR_IO, -1);
722 if (fstat(tdb->fd, &st) == -1) {
723 return TDB_ERRCODE(TDB_ERR_IO, -1);
726 if (st.st_size < (size_t)len) {
728 /* Ensure ecode is set for log fn. */
729 tdb->ecode = TDB_ERR_IO;
730 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
731 (int)len, (int)st.st_size));
733 return TDB_ERRCODE(TDB_ERR_IO, -1);
736 /* Unmap, update size, remap */
737 if (tdb_munmap(tdb) == -1)
738 return TDB_ERRCODE(TDB_ERR_IO, -1);
739 tdb->map_size = st.st_size;
744 /* write a lump of data at a specified offset */
745 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
746 const void *buf, tdb_len_t len)
752 if (tdb->read_only || tdb->traverse_read) {
753 tdb->ecode = TDB_ERR_RDONLY;
757 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
761 memcpy(off + (char *)tdb->map_ptr, buf, len);
762 } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
763 /* Ensure ecode is set for log fn. */
764 tdb->ecode = TDB_ERR_IO;
765 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
766 off, len, strerror(errno)));
767 return TDB_ERRCODE(TDB_ERR_IO, -1);
772 /* Endian conversion: we only ever deal with 4 byte quantities */
773 void *tdb_convert(void *buf, u32 size)
775 u32 i, *p = (u32 *)buf;
776 for (i = 0; i < size / 4; i++)
777 p[i] = TDB_BYTEREV(p[i]);
782 /* read a lump of data at a specified offset, maybe convert */
783 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
784 tdb_len_t len, int cv)
786 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
791 memcpy(buf, off + (char *)tdb->map_ptr, len);
793 ssize_t ret = pread(tdb->fd, buf, len, off);
794 if (ret != (ssize_t)len) {
795 /* Ensure ecode is set for log fn. */
796 tdb->ecode = TDB_ERR_IO;
797 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
798 "len=%d ret=%d (%s) map_size=%d\n",
799 (int)off, (int)len, (int)ret, strerror(errno),
800 (int)tdb->map_size));
801 return TDB_ERRCODE(TDB_ERR_IO, -1);
805 tdb_convert(buf, len);
813 do an unlocked scan of the hash table heads to find the next non-zero head. The value
814 will then be confirmed with the lock held
816 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
820 for (;h < tdb->header.hash_size;h++) {
821 if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
827 for (;h < tdb->header.hash_size;h++) {
828 if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
837 int tdb_munmap(struct tdb_context *tdb)
839 if (tdb->flags & TDB_INTERNAL)
844 int ret = munmap(tdb->map_ptr, tdb->map_size);
853 void tdb_mmap(struct tdb_context *tdb)
855 if (tdb->flags & TDB_INTERNAL)
859 if (!(tdb->flags & TDB_NOMMAP)) {
860 tdb->map_ptr = mmap(NULL, tdb->map_size,
861 PROT_READ|(tdb->read_only? 0:PROT_WRITE),
862 MAP_SHARED|MAP_FILE, tdb->fd, 0);
865 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
868 if (tdb->map_ptr == MAP_FAILED) {
870 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
871 tdb->map_size, strerror(errno)));
881 /* expand a file. we prefer to use ftruncate, as that is what posix
882 says to use for mmap expansion */
883 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
887 if (tdb->read_only || tdb->traverse_read) {
888 tdb->ecode = TDB_ERR_RDONLY;
892 if (ftruncate(tdb->fd, size+addition) == -1) {
894 if (pwrite(tdb->fd, &b, 1, (size+addition) - 1) != 1) {
895 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
896 size+addition, strerror(errno)));
901 /* now fill the file with something. This ensures that the
902 file isn't sparse, which would be very bad if we ran out of
903 disk. This must be done with write, not via mmap */
904 memset(buf, TDB_PAD_BYTE, sizeof(buf));
906 int n = addition>sizeof(buf)?sizeof(buf):addition;
907 int ret = pwrite(tdb->fd, buf, n, size);
909 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
910 n, strerror(errno)));
920 /* expand the database at least size bytes by expanding the underlying
921 file and doing the mmap again if necessary */
922 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
924 struct list_struct rec;
927 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
928 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
932 /* must know about any previous expansions by another process */
933 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
935 /* always make room for at least 10 more records, and round
936 the database up to a multiple of the page size */
937 size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
939 if (!(tdb->flags & TDB_INTERNAL))
943 * We must ensure the file is unmapped before doing this
944 * to ensure consistency with systems like OpenBSD where
945 * writes and mmaps are not consistent.
948 /* expand the file itself */
949 if (!(tdb->flags & TDB_INTERNAL)) {
950 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
954 tdb->map_size += size;
956 if (tdb->flags & TDB_INTERNAL) {
957 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
960 tdb->map_size -= size;
963 tdb->map_ptr = new_map_ptr;
966 * We must ensure the file is remapped before adding the space
967 * to ensure consistency with systems like OpenBSD where
968 * writes and mmaps are not consistent.
971 /* We're ok if the mmap fails as we'll fallback to read/write */
975 /* form a new freelist record */
976 memset(&rec,'\0',sizeof(rec));
977 rec.rec_len = size - sizeof(rec);
979 /* link it into the free list */
980 offset = tdb->map_size - size;
981 if (tdb_free(tdb, offset, &rec) == -1)
984 tdb_unlock(tdb, -1, F_WRLCK);
987 tdb_unlock(tdb, -1, F_WRLCK);
991 /* read/write a tdb_off_t */
992 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
994 return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
997 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1000 return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1004 /* read a lump of data, allocating the space for it */
1005 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1009 /* some systems don't like zero length malloc */
1014 if (!(buf = (unsigned char *)malloc(len))) {
1015 /* Ensure ecode is set for log fn. */
1016 tdb->ecode = TDB_ERR_OOM;
1017 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1018 len, strerror(errno)));
1019 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1021 if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1028 /* Give a piece of tdb data to a parser */
1030 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1031 tdb_off_t offset, tdb_len_t len,
1032 int (*parser)(TDB_DATA key, TDB_DATA data,
1033 void *private_data),
1041 if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1043 * Optimize by avoiding the malloc/memcpy/free, point the
1044 * parser directly at the mmap area.
1046 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1049 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1050 return parser(key, data, private_data);
1053 if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1057 result = parser(key, data, private_data);
1062 /* read/write a record */
1063 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1065 if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1067 if (TDB_BAD_MAGIC(rec)) {
1068 /* Ensure ecode is set for log fn. */
1069 tdb->ecode = TDB_ERR_CORRUPT;
1070 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1071 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1073 return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1076 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1078 struct list_struct r = *rec;
1079 return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1082 static const struct tdb_methods io_methods = {
1085 tdb_next_hash_chain,
1092 initialise the default methods table
1094 void tdb_io_init(struct tdb_context *tdb)
1096 tdb->methods = &io_methods;
1099 /* file: transaction.c */
1104 - only allow a single transaction at a time per database. This makes
1105 using the transaction API simpler, as otherwise the caller would
1106 have to cope with temporary failures in transactions that conflict
1107 with other current transactions
1109 - keep the transaction recovery information in the same file as the
1110 database, using a special 'transaction recovery' record pointed at
1111 by the header. This removes the need for extra journal files as
1112 used by some other databases
1114 - dynamically allocated the transaction recover record, re-using it
1115 for subsequent transactions. If a larger record is needed then
1116 tdb_free() the old record to place it on the normal tdb freelist
1117 before allocating the new record
1119 - during transactions, keep a linked list of writes all that have
1120 been performed by intercepting all tdb_write() calls. The hooked
1121 transaction versions of tdb_read() and tdb_write() check this
1122 linked list and try to use the elements of the list in preference
1123 to the real database.
1125 - don't allow any locks to be held when a transaction starts,
1126 otherwise we can end up with deadlock (plus lack of lock nesting
1127 in posix locks would mean the lock is lost)
1129 - if the caller gains a lock during the transaction but doesn't
1130 release it then fail the commit
1132 - allow for nested calls to tdb_transaction_start(), re-using the
1133 existing transaction record. If the inner transaction is cancelled
1134 then a subsequent commit will fail
1136 - keep a mirrored copy of the tdb hash chain heads to allow for the
1137 fast hash heads scan on traverse, updating the mirrored copy in
1138 the transaction version of tdb_write
1140 - allow callers to mix transaction and non-transaction use of tdb,
1141 although once a transaction is started then an exclusive lock is
1142 gained until the transaction is committed or cancelled
1144 - the commit stategy involves first saving away all modified data
1145 into a linearised buffer in the transaction recovery area, then
1146 marking the transaction recovery area with a magic value to
1147 indicate a valid recovery record. In total 4 fsync/msync calls are
1148 needed per commit to prevent race conditions. It might be possible
1149 to reduce this to 3 or even 2 with some more work.
1151 - check for a valid recovery record on open of the tdb, while the
1152 global lock is held. Automatically recover from the transaction
1153 recovery area if needed, then continue with the open as
1154 usual. This allows for smooth crash recovery with no administrator
1157 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1158 still available, but no transaction recovery area is used and no
1159 fsync/msync calls are made.
1163 struct tdb_transaction_el {
1164 struct tdb_transaction_el *next, *prev;
1167 unsigned char *data;
1171 hold the context of any current transaction
1173 struct tdb_transaction {
1174 /* we keep a mirrored copy of the tdb hash heads here so
1175 tdb_next_hash_chain() can operate efficiently */
1178 /* the original io methods - used to do IOs to the real db */
1179 const struct tdb_methods *io_methods;
1181 /* the list of transaction elements. We use a doubly linked
1182 list with a last pointer to allow us to keep the list
1183 ordered, with first element at the front of the list. It
1184 needs to be doubly linked as the read/write traversals need
1185 to be backwards, while the commit needs to be forwards */
1186 struct tdb_transaction_el *elements, *elements_last;
1188 /* non-zero when an internal transaction error has
1189 occurred. All write operations will then fail until the
1190 transaction is ended */
1191 int transaction_error;
1193 /* when inside a transaction we need to keep track of any
1194 nested tdb_transaction_start() calls, as these are allowed,
1195 but don't create a new transaction */
1198 /* old file size before transaction */
1199 tdb_len_t old_map_size;
1204 read while in a transaction. We need to check first if the data is in our list
1205 of transaction elements, then if not do a real read
1207 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1208 tdb_len_t len, int cv)
1210 struct tdb_transaction_el *el;
1212 /* we need to walk the list backwards to get the most recent data */
1213 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1216 if (off+len <= el->offset) {
1219 if (off >= el->offset + el->length) {
1223 /* an overlapping read - needs to be split into up to
1224 2 reads and a memcpy */
1225 if (off < el->offset) {
1226 partial = el->offset - off;
1227 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1232 buf = (void *)(partial + (char *)buf);
1234 if (off + len <= el->offset + el->length) {
1237 partial = el->offset + el->length - off;
1239 memcpy(buf, el->data + (off - el->offset), partial);
1241 tdb_convert(buf, len);
1245 buf = (void *)(partial + (char *)buf);
1247 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1254 /* its not in the transaction elements - do a real read */
1255 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1258 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1259 tdb->ecode = TDB_ERR_IO;
1260 tdb->transaction->transaction_error = 1;
1266 write while in a transaction
1268 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1269 const void *buf, tdb_len_t len)
1271 struct tdb_transaction_el *el, *best_el=NULL;
1277 /* if the write is to a hash head, then update the transaction
1279 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1280 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1281 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1282 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1285 /* first see if we can replace an existing entry */
1286 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1289 if (best_el == NULL && off == el->offset+el->length) {
1293 if (off+len <= el->offset) {
1296 if (off >= el->offset + el->length) {
1300 /* an overlapping write - needs to be split into up to
1301 2 writes and a memcpy */
1302 if (off < el->offset) {
1303 partial = el->offset - off;
1304 if (transaction_write(tdb, off, buf, partial) != 0) {
1309 buf = (const void *)(partial + (const char *)buf);
1311 if (off + len <= el->offset + el->length) {
1314 partial = el->offset + el->length - off;
1316 memcpy(el->data + (off - el->offset), buf, partial);
1319 buf = (const void *)(partial + (const char *)buf);
1321 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1328 /* see if we can append the new entry to an existing entry */
1329 if (best_el && best_el->offset + best_el->length == off &&
1330 (off+len < tdb->transaction->old_map_size ||
1331 off > tdb->transaction->old_map_size)) {
1332 unsigned char *data = best_el->data;
1334 el->data = (unsigned char *)realloc(el->data,
1336 if (el->data == NULL) {
1337 tdb->ecode = TDB_ERR_OOM;
1338 tdb->transaction->transaction_error = 1;
1343 memcpy(el->data + el->length, buf, len);
1345 memset(el->data + el->length, TDB_PAD_BYTE, len);
1351 /* add a new entry at the end of the list */
1352 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1354 tdb->ecode = TDB_ERR_OOM;
1355 tdb->transaction->transaction_error = 1;
1359 el->prev = tdb->transaction->elements_last;
1362 el->data = (unsigned char *)malloc(len);
1363 if (el->data == NULL) {
1365 tdb->ecode = TDB_ERR_OOM;
1366 tdb->transaction->transaction_error = 1;
1370 memcpy(el->data, buf, len);
1372 memset(el->data, TDB_PAD_BYTE, len);
1375 el->prev->next = el;
1377 tdb->transaction->elements = el;
1379 tdb->transaction->elements_last = el;
1383 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1384 tdb->ecode = TDB_ERR_IO;
1385 tdb->transaction->transaction_error = 1;
1390 accelerated hash chain head search, using the cached hash heads
1392 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1395 for (;h < tdb->header.hash_size;h++) {
1396 /* the +1 takes account of the freelist */
1397 if (0 != tdb->transaction->hash_heads[h+1]) {
1405 out of bounds check during a transaction
1407 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1409 if (len <= tdb->map_size) {
1412 return TDB_ERRCODE(TDB_ERR_IO, -1);
1416 transaction version of tdb_expand().
1418 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1421 /* add a write to the transaction elements, so subsequent
1422 reads see the zero data */
1423 if (transaction_write(tdb, size, NULL, addition) != 0) {
1431 brlock during a transaction - ignore them
1433 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1434 int rw_type, int lck_type, int probe, size_t len)
1439 static const struct tdb_methods transaction_methods = {
1442 transaction_next_hash_chain,
1444 transaction_expand_file,
1450 start a tdb transaction. No token is returned, as only a single
1451 transaction is allowed to be pending per tdb_context
1453 int tdb_transaction_start(struct tdb_context *tdb)
1455 /* some sanity checks */
1456 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1457 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1458 tdb->ecode = TDB_ERR_EINVAL;
1462 /* cope with nested tdb_transaction_start() calls */
1463 if (tdb->transaction != NULL) {
1464 tdb->transaction->nesting++;
1465 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1466 tdb->transaction->nesting));
1470 if (tdb->num_locks != 0 || tdb->global_lock.count) {
1471 /* the caller must not have any locks when starting a
1472 transaction as otherwise we'll be screwed by lack
1473 of nested locks in posix */
1474 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1475 tdb->ecode = TDB_ERR_LOCK;
1479 if (tdb->travlocks.next != NULL) {
1480 /* you cannot use transactions inside a traverse (although you can use
1481 traverse inside a transaction) as otherwise you can end up with
1483 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1484 tdb->ecode = TDB_ERR_LOCK;
1488 tdb->transaction = (struct tdb_transaction *)
1489 calloc(sizeof(struct tdb_transaction), 1);
1490 if (tdb->transaction == NULL) {
1491 tdb->ecode = TDB_ERR_OOM;
1495 /* get the transaction write lock. This is a blocking lock. As
1496 discussed with Volker, there are a number of ways we could
1497 make this async, which we will probably do in the future */
1498 if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1499 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
1500 tdb->ecode = TDB_ERR_LOCK;
1501 SAFE_FREE(tdb->transaction);
1505 /* get a read lock from the freelist to the end of file. This
1506 is upgraded to a write lock during the commit */
1507 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1508 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1509 tdb->ecode = TDB_ERR_LOCK;
1513 /* setup a copy of the hash table heads so the hash scan in
1514 traverse can be fast */
1515 tdb->transaction->hash_heads = (u32 *)
1516 calloc(tdb->header.hash_size+1, sizeof(u32));
1517 if (tdb->transaction->hash_heads == NULL) {
1518 tdb->ecode = TDB_ERR_OOM;
1521 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1522 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1523 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1524 tdb->ecode = TDB_ERR_IO;
1528 /* make sure we know about any file expansions already done by
1530 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1531 tdb->transaction->old_map_size = tdb->map_size;
1533 /* finally hook the io methods, replacing them with
1534 transaction specific methods */
1535 tdb->transaction->io_methods = tdb->methods;
1536 tdb->methods = &transaction_methods;
1538 /* by calling this transaction write here, we ensure that we don't grow the
1539 transaction linked list due to hash table updates */
1540 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1541 TDB_HASHTABLE_SIZE(tdb)) != 0) {
1542 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1543 tdb->ecode = TDB_ERR_IO;
1550 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1551 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1552 SAFE_FREE(tdb->transaction->hash_heads);
1553 SAFE_FREE(tdb->transaction);
1559 cancel the current transaction
1561 int tdb_transaction_cancel(struct tdb_context *tdb)
1563 if (tdb->transaction == NULL) {
1564 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1568 if (tdb->transaction->nesting != 0) {
1569 tdb->transaction->transaction_error = 1;
1570 tdb->transaction->nesting--;
1574 tdb->map_size = tdb->transaction->old_map_size;
1576 /* free all the transaction elements */
1577 while (tdb->transaction->elements) {
1578 struct tdb_transaction_el *el = tdb->transaction->elements;
1579 tdb->transaction->elements = el->next;
1584 /* remove any global lock created during the transaction */
1585 if (tdb->global_lock.count != 0) {
1586 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1587 tdb->global_lock.count = 0;
1590 /* remove any locks created during the transaction */
1591 if (tdb->num_locks != 0) {
1593 for (i=0;i<tdb->num_lockrecs;i++) {
1594 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1595 F_UNLCK,F_SETLKW, 0, 1);
1600 /* restore the normal io methods */
1601 tdb->methods = tdb->transaction->io_methods;
1603 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1604 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1605 SAFE_FREE(tdb->transaction->hash_heads);
1606 SAFE_FREE(tdb->transaction);
1614 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1616 if (fsync(tdb->fd) != 0) {
1617 tdb->ecode = TDB_ERR_IO;
1618 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1623 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1624 if (msync(moffset + (char *)tdb->map_ptr,
1625 length + (offset - moffset), MS_SYNC) != 0) {
1626 tdb->ecode = TDB_ERR_IO;
1627 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1638 work out how much space the linearised recovery data will consume
1640 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1642 struct tdb_transaction_el *el;
1643 tdb_len_t recovery_size = 0;
1645 recovery_size = sizeof(u32);
1646 for (el=tdb->transaction->elements;el;el=el->next) {
1647 if (el->offset >= tdb->transaction->old_map_size) {
1650 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1653 return recovery_size;
1657 allocate the recovery area, or use an existing recovery area if it is
1660 static int tdb_recovery_allocate(struct tdb_context *tdb,
1661 tdb_len_t *recovery_size,
1662 tdb_off_t *recovery_offset,
1663 tdb_len_t *recovery_max_size)
1665 struct list_struct rec;
1666 const struct tdb_methods *methods = tdb->transaction->io_methods;
1667 tdb_off_t recovery_head;
1669 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1670 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1676 if (recovery_head != 0 &&
1677 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1678 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1682 *recovery_size = tdb_recovery_size(tdb);
1684 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1685 /* it fits in the existing area */
1686 *recovery_max_size = rec.rec_len;
1687 *recovery_offset = recovery_head;
1691 /* we need to free up the old recovery area, then allocate a
1692 new one at the end of the file. Note that we cannot use
1693 tdb_allocate() to allocate the new one as that might return
1694 us an area that is being currently used (as of the start of
1696 if (recovery_head != 0) {
1697 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1698 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1703 /* the tdb_free() call might have increased the recovery size */
1704 *recovery_size = tdb_recovery_size(tdb);
1706 /* round up to a multiple of page size */
1707 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1708 *recovery_offset = tdb->map_size;
1709 recovery_head = *recovery_offset;
1711 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1712 (tdb->map_size - tdb->transaction->old_map_size) +
1713 sizeof(rec) + *recovery_max_size) == -1) {
1714 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1718 /* remap the file (if using mmap) */
1719 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1721 /* we have to reset the old map size so that we don't try to expand the file
1722 again in the transaction commit, which would destroy the recovery area */
1723 tdb->transaction->old_map_size = tdb->map_size;
1725 /* write the recovery header offset and sync - we can sync without a race here
1726 as the magic ptr in the recovery record has not been set */
1727 CONVERT(recovery_head);
1728 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1729 &recovery_head, sizeof(tdb_off_t)) == -1) {
1730 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1739 setup the recovery data that will be used on a crash during commit
1741 static int transaction_setup_recovery(struct tdb_context *tdb,
1742 tdb_off_t *magic_offset)
1744 struct tdb_transaction_el *el;
1745 tdb_len_t recovery_size;
1746 unsigned char *data, *p;
1747 const struct tdb_methods *methods = tdb->transaction->io_methods;
1748 struct list_struct *rec;
1749 tdb_off_t recovery_offset, recovery_max_size;
1750 tdb_off_t old_map_size = tdb->transaction->old_map_size;
1754 check that the recovery area has enough space
1756 if (tdb_recovery_allocate(tdb, &recovery_size,
1757 &recovery_offset, &recovery_max_size) == -1) {
1761 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1763 tdb->ecode = TDB_ERR_OOM;
1767 rec = (struct list_struct *)data;
1768 memset(rec, 0, sizeof(*rec));
1771 rec->data_len = recovery_size;
1772 rec->rec_len = recovery_max_size;
1773 rec->key_len = old_map_size;
1776 /* build the recovery data into a single blob to allow us to do a single
1777 large write, which should be more efficient */
1778 p = data + sizeof(*rec);
1779 for (el=tdb->transaction->elements;el;el=el->next) {
1780 if (el->offset >= old_map_size) {
1783 if (el->offset + el->length > tdb->transaction->old_map_size) {
1784 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1786 tdb->ecode = TDB_ERR_CORRUPT;
1789 memcpy(p, &el->offset, 4);
1790 memcpy(p+4, &el->length, 4);
1794 /* the recovery area contains the old data, not the
1795 new data, so we have to call the original tdb_read
1797 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1799 tdb->ecode = TDB_ERR_IO;
1802 p += 8 + el->length;
1805 /* and the tailer */
1806 tailer = sizeof(*rec) + recovery_max_size;
1807 memcpy(p, &tailer, 4);
1810 /* write the recovery data to the recovery area */
1811 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1812 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1814 tdb->ecode = TDB_ERR_IO;
1818 /* as we don't have ordered writes, we have to sync the recovery
1819 data before we update the magic to indicate that the recovery
1821 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1828 magic = TDB_RECOVERY_MAGIC;
1831 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1833 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1834 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1835 tdb->ecode = TDB_ERR_IO;
1839 /* ensure the recovery magic marker is on disk */
1840 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1848 commit the current transaction
1850 int tdb_transaction_commit(struct tdb_context *tdb)
1852 const struct tdb_methods *methods;
1853 tdb_off_t magic_offset = 0;
1856 if (tdb->transaction == NULL) {
1857 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1861 if (tdb->transaction->transaction_error) {
1862 tdb->ecode = TDB_ERR_IO;
1863 tdb_transaction_cancel(tdb);
1864 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1868 if (tdb->transaction->nesting != 0) {
1869 tdb->transaction->nesting--;
1873 /* check for a null transaction */
1874 if (tdb->transaction->elements == NULL) {
1875 tdb_transaction_cancel(tdb);
1879 methods = tdb->transaction->io_methods;
1881 /* if there are any locks pending then the caller has not
1882 nested their locks properly, so fail the transaction */
1883 if (tdb->num_locks || tdb->global_lock.count) {
1884 tdb->ecode = TDB_ERR_LOCK;
1885 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
1886 tdb_transaction_cancel(tdb);
1890 /* upgrade the main transaction lock region to a write lock */
1891 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
1892 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
1893 tdb->ecode = TDB_ERR_LOCK;
1894 tdb_transaction_cancel(tdb);
1898 /* get the global lock - this prevents new users attaching to the database
1899 during the commit */
1900 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1901 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
1902 tdb->ecode = TDB_ERR_LOCK;
1903 tdb_transaction_cancel(tdb);
1907 if (!(tdb->flags & TDB_NOSYNC)) {
1908 /* write the recovery data to the end of the file */
1909 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
1910 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
1911 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1912 tdb_transaction_cancel(tdb);
1917 /* expand the file to the new size if needed */
1918 if (tdb->map_size != tdb->transaction->old_map_size) {
1919 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1921 tdb->transaction->old_map_size) == -1) {
1922 tdb->ecode = TDB_ERR_IO;
1923 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
1924 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1925 tdb_transaction_cancel(tdb);
1928 tdb->map_size = tdb->transaction->old_map_size;
1929 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1932 /* perform all the writes */
1933 while (tdb->transaction->elements) {
1934 struct tdb_transaction_el *el = tdb->transaction->elements;
1936 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
1937 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1939 /* we've overwritten part of the data and
1940 possibly expanded the file, so we need to
1941 run the crash recovery code */
1942 tdb->methods = methods;
1943 tdb_transaction_recover(tdb);
1945 tdb_transaction_cancel(tdb);
1946 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1948 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1951 tdb->transaction->elements = el->next;
1956 if (!(tdb->flags & TDB_NOSYNC)) {
1957 /* ensure the new data is on disk */
1958 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1962 /* remove the recovery marker */
1963 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
1964 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
1968 /* ensure the recovery marker has been removed on disk */
1969 if (transaction_sync(tdb, magic_offset, 4) == -1) {
1974 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1977 TODO: maybe write to some dummy hdr field, or write to magic
1978 offset without mmap, before the last sync, instead of the
1982 /* on some systems (like Linux 2.6.x) changes via mmap/msync
1983 don't change the mtime of the file, this means the file may
1984 not be backed up (as tdb rounding to block sizes means that
1985 file size changes are quite rare too). The following forces
1986 mtime changes when a transaction completes */
1988 utime(tdb->name, NULL);
1991 /* use a transaction cancel to free memory and remove the
1992 transaction locks */
1993 tdb_transaction_cancel(tdb);
1999 recover from an aborted transaction. Must be called with exclusive
2000 database write access already established (including the global
2001 lock to prevent new processes attaching)
2003 int tdb_transaction_recover(struct tdb_context *tdb)
2005 tdb_off_t recovery_head, recovery_eof;
2006 unsigned char *data, *p;
2008 struct list_struct rec;
2010 /* find the recovery area */
2011 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2012 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2013 tdb->ecode = TDB_ERR_IO;
2017 if (recovery_head == 0) {
2018 /* we have never allocated a recovery record */
2022 /* read the recovery record */
2023 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2024 sizeof(rec), DOCONV()) == -1) {
2025 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2026 tdb->ecode = TDB_ERR_IO;
2030 if (rec.magic != TDB_RECOVERY_MAGIC) {
2031 /* there is no valid recovery data */
2035 if (tdb->read_only) {
2036 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2037 tdb->ecode = TDB_ERR_CORRUPT;
2041 recovery_eof = rec.key_len;
2043 data = (unsigned char *)malloc(rec.data_len);
2045 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2046 tdb->ecode = TDB_ERR_OOM;
2050 /* read the full recovery data */
2051 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2052 rec.data_len, 0) == -1) {
2053 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2054 tdb->ecode = TDB_ERR_IO;
2058 /* recover the file data */
2060 while (p+8 < data + rec.data_len) {
2066 memcpy(&len, p+4, 4);
2068 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2070 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2071 tdb->ecode = TDB_ERR_IO;
2079 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2080 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2081 tdb->ecode = TDB_ERR_IO;
2085 /* if the recovery area is after the recovered eof then remove it */
2086 if (recovery_eof <= recovery_head) {
2087 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2088 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2089 tdb->ecode = TDB_ERR_IO;
2094 /* remove the recovery magic */
2095 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2097 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2098 tdb->ecode = TDB_ERR_IO;
2102 /* reduce the file size to the old size */
2104 if (ftruncate(tdb->fd, recovery_eof) != 0) {
2105 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2106 tdb->ecode = TDB_ERR_IO;
2109 tdb->map_size = recovery_eof;
2112 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2113 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2114 tdb->ecode = TDB_ERR_IO;
2118 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2125 /* file: freelist.c */
2127 /* read a freelist record and check for simple errors */
2128 static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2130 if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2133 if (rec->magic == TDB_MAGIC) {
2134 /* this happens when a app is showdown while deleting a record - we should
2135 not completely fail when this happens */
2136 TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2138 rec->magic = TDB_FREE_MAGIC;
2139 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2143 if (rec->magic != TDB_FREE_MAGIC) {
2144 /* Ensure ecode is set for log fn. */
2145 tdb->ecode = TDB_ERR_CORRUPT;
2146 TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read bad magic 0x%x at offset=%d\n",
2148 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2150 if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2157 /* Remove an element from the freelist. Must have alloc lock. */
2158 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2160 tdb_off_t last_ptr, i;
2162 /* read in the freelist top */
2163 last_ptr = FREELIST_TOP;
2164 while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2166 /* We've found it! */
2167 return tdb_ofs_write(tdb, last_ptr, &next);
2169 /* Follow chain (next offset is at start of record) */
2172 TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2173 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2177 /* update a record tailer (must hold allocation lock) */
2178 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2179 const struct list_struct *rec)
2181 tdb_off_t totalsize;
2183 /* Offset of tailer from record header */
2184 totalsize = sizeof(*rec) + rec->rec_len;
2185 return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2189 /* Add an element into the freelist. Merge adjacent records if
2191 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2193 tdb_off_t right, left;
2195 /* Allocation and tailer lock */
2196 if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2199 /* set an initial tailer, so if we fail we don't leave a bogus record */
2200 if (update_tailer(tdb, offset, rec) != 0) {
2201 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2205 /* Look right first (I'm an Australian, dammit) */
2206 right = offset + sizeof(*rec) + rec->rec_len;
2207 if (right + sizeof(*rec) <= tdb->map_size) {
2208 struct list_struct r;
2210 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2211 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2215 /* If it's free, expand to include it. */
2216 if (r.magic == TDB_FREE_MAGIC) {
2217 if (remove_from_freelist(tdb, right, r.next) == -1) {
2218 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2221 rec->rec_len += sizeof(r) + r.rec_len;
2227 left = offset - sizeof(tdb_off_t);
2228 if (left > TDB_DATA_START(tdb->header.hash_size)) {
2229 struct list_struct l;
2232 /* Read in tailer and jump back to header */
2233 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2234 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2238 /* it could be uninitialised data */
2239 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2243 left = offset - leftsize;
2245 /* Now read in record */
2246 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2247 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2251 /* If it's free, expand to include it. */
2252 if (l.magic == TDB_FREE_MAGIC) {
2253 if (remove_from_freelist(tdb, left, l.next) == -1) {
2254 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2258 rec->rec_len += leftsize;
2264 if (update_tailer(tdb, offset, rec) == -1) {
2265 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2269 /* Now, prepend to free list */
2270 rec->magic = TDB_FREE_MAGIC;
2272 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2273 tdb_rec_write(tdb, offset, rec) == -1 ||
2274 tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2275 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2279 /* And we're done. */
2280 tdb_unlock(tdb, -1, F_WRLCK);
2284 tdb_unlock(tdb, -1, F_WRLCK);
2290 the core of tdb_allocate - called when we have decided which
2291 free list entry to use
2293 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2294 struct list_struct *rec, tdb_off_t last_ptr)
2296 struct list_struct newrec;
2297 tdb_off_t newrec_ptr;
2299 memset(&newrec, '\0', sizeof(newrec));
2301 /* found it - now possibly split it up */
2302 if (rec->rec_len > length + MIN_REC_SIZE) {
2303 /* Length of left piece */
2304 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2306 /* Right piece to go on free list */
2307 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2308 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2310 /* And left record is shortened */
2311 rec->rec_len = length;
2316 /* Remove allocated record from the free list */
2317 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2321 /* Update header: do this before we drop alloc
2322 lock, otherwise tdb_free() might try to
2323 merge with us, thinking we're free.
2324 (Thanks Jeremy Allison). */
2325 rec->magic = TDB_MAGIC;
2326 if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2330 /* Did we create new block? */
2332 /* Update allocated record tailer (we
2334 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2338 /* Free new record */
2339 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2344 /* all done - return the new record offset */
2348 /* allocate some space from the free list. The offset returned points
2349 to a unconnected list_struct within the database with room for at
2350 least length bytes of total data
2352 0 is returned if the space could not be allocated
2354 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2356 tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2358 tdb_off_t rec_ptr, last_ptr;
2362 if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2365 /* Extra bytes required for tailer */
2366 length += sizeof(tdb_off_t);
2369 last_ptr = FREELIST_TOP;
2371 /* read in the freelist top */
2372 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2375 bestfit.rec_ptr = 0;
2376 bestfit.last_ptr = 0;
2377 bestfit.rec_len = 0;
2380 this is a best fit allocation strategy. Originally we used
2381 a first fit strategy, but it suffered from massive fragmentation
2382 issues when faced with a slowly increasing record size.
2385 if (rec_free_read(tdb, rec_ptr, rec) == -1) {
2389 if (rec->rec_len >= length) {
2390 if (bestfit.rec_ptr == 0 ||
2391 rec->rec_len < bestfit.rec_len) {
2392 bestfit.rec_len = rec->rec_len;
2393 bestfit.rec_ptr = rec_ptr;
2394 bestfit.last_ptr = last_ptr;
2395 /* consider a fit to be good enough if
2396 we aren't wasting more than half
2398 if (bestfit.rec_len < 2*length) {
2404 /* move to the next record */
2406 rec_ptr = rec->next;
2409 if (bestfit.rec_ptr != 0) {
2410 if (rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2414 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2415 tdb_unlock(tdb, -1, F_WRLCK);
2419 /* we didn't find enough space. See if we can expand the
2420 database and if we can then try again */
2421 if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2424 tdb_unlock(tdb, -1, F_WRLCK);
2428 /* file: freelistcheck.c */
2430 /* Check the freelist is good and contains no loops.
2431 Very memory intensive - only do this as a consistency
2432 checker. Heh heh - uses an in memory tdb as the storage
2433 for the "seen" record list. For some reason this strikes
2434 me as extremely clever as I don't have to write another tree
2435 data structure implementation :-).
2438 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2442 memset(&data, '\0', sizeof(data));
2443 key.dptr = (unsigned char *)&rec_ptr;
2444 key.dsize = sizeof(rec_ptr);
2445 return tdb_store(mem_tdb, key, data, TDB_INSERT);
2448 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2450 struct tdb_context *mem_tdb = NULL;
2451 struct list_struct rec;
2452 tdb_off_t rec_ptr, last_ptr;
2457 mem_tdb = tdb_open("flval", tdb->header.hash_size,
2458 TDB_INTERNAL, O_RDWR, 0600);
2463 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2468 last_ptr = FREELIST_TOP;
2470 /* Store the FREELIST_TOP record. */
2471 if (seen_insert(mem_tdb, last_ptr) == -1) {
2472 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2476 /* read in the freelist top */
2477 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2483 /* If we can't store this record (we've seen it
2484 before) then the free list has a loop and must
2487 if (seen_insert(mem_tdb, rec_ptr)) {
2488 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2492 if (rec_free_read(tdb, rec_ptr, &rec) == -1) {
2496 /* move to the next record */
2507 tdb_unlock(tdb, -1, F_WRLCK);
2511 /* file: traverse.c */
2513 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2514 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2515 struct list_struct *rec)
2517 int want_next = (tlock->off != 0);
2519 /* Lock each chain from the start one. */
2520 for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2521 if (!tlock->off && tlock->hash != 0) {
2522 /* this is an optimisation for the common case where
2523 the hash chain is empty, which is particularly
2524 common for the use of tdb with ldb, where large
2525 hashes are used. In that case we spend most of our
2526 time in tdb_brlock(), locking empty hash chains.
2528 To avoid this, we do an unlocked pre-check to see
2529 if the hash chain is empty before starting to look
2530 inside it. If it is empty then we can avoid that
2531 hash chain. If it isn't empty then we can't believe
2532 the value we get back, as we read it without a
2533 lock, so instead we get the lock and re-fetch the
2536 Notice that not doing this optimisation on the
2537 first hash chain is critical. We must guarantee
2538 that we have done at least one fcntl lock at the
2539 start of a search to guarantee that memory is
2540 coherent on SMP systems. If records are added by
2541 others during the search then thats OK, and we
2542 could possibly miss those with this trick, but we
2543 could miss them anyway without this trick, so the
2544 semantics don't change.
2546 With a non-indexed ldb search this trick gains us a
2547 factor of around 80 in speed on a linux 2.6.x
2548 system (testing using ldbtest).
2550 tdb->methods->next_hash_chain(tdb, &tlock->hash);
2551 if (tlock->hash == tdb->header.hash_size) {
2556 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2559 /* No previous record? Start at top of chain. */
2561 if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2565 /* Otherwise unlock the previous record. */
2566 if (tdb_unlock_record(tdb, tlock->off) != 0)
2571 /* We have offset of old record: grab next */
2572 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2574 tlock->off = rec->next;
2577 /* Iterate through chain */
2578 while( tlock->off) {
2580 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2583 /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2584 if (tlock->off == rec->next) {
2585 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2589 if (!TDB_DEAD(rec)) {
2590 /* Woohoo: we found one! */
2591 if (tdb_lock_record(tdb, tlock->off) != 0)
2596 /* Try to clean dead ones from old traverses */
2597 current = tlock->off;
2598 tlock->off = rec->next;
2599 if (!(tdb->read_only || tdb->traverse_read) &&
2600 tdb_do_delete(tdb, current, rec) != 0)
2603 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2606 /* We finished iteration without finding anything */
2607 return TDB_ERRCODE(TDB_SUCCESS, 0);
2611 if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2612 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2616 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2617 return -1 on error or the record count traversed
2618 if fn is NULL then it is not called
2619 a non-zero return value from fn() indicates that the traversal should stop
2621 static int tdb_traverse_internal(struct tdb_context *tdb,
2622 tdb_traverse_func fn, void *private_data,
2623 struct tdb_traverse_lock *tl)
2626 struct list_struct rec;
2629 /* This was in the initializaton, above, but the IRIX compiler
2630 * did not like it. crh
2632 tl->next = tdb->travlocks.next;
2634 /* fcntl locks don't stack: beware traverse inside traverse */
2635 tdb->travlocks.next = tl;
2637 /* tdb_next_lock places locks on the record returned, and its chain */
2638 while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2640 /* now read the full record */
2641 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2642 rec.key_len + rec.data_len);
2645 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2647 if (tdb_unlock_record(tdb, tl->off) != 0)
2648 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2651 key.dsize = rec.key_len;
2652 dbuf.dptr = key.dptr + rec.key_len;
2653 dbuf.dsize = rec.data_len;
2655 /* Drop chain lock, call out */
2656 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2658 SAFE_FREE(key.dptr);
2661 if (fn && fn(tdb, key, dbuf, private_data)) {
2662 /* They want us to terminate traversal */
2664 if (tdb_unlock_record(tdb, tl->off) != 0) {
2665 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2668 SAFE_FREE(key.dptr);
2671 SAFE_FREE(key.dptr);
2674 tdb->travlocks.next = tl->next;
2683 a write style traverse - temporarily marks the db read only
2685 int tdb_traverse_read(struct tdb_context *tdb,
2686 tdb_traverse_func fn, void *private_data)
2688 struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2691 /* we need to get a read lock on the transaction lock here to
2692 cope with the lock ordering semantics of solaris10 */
2693 if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1) {
2694 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse_read: failed to get transaction lock\n"));
2695 tdb->ecode = TDB_ERR_LOCK;
2699 tdb->traverse_read++;
2700 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2701 tdb->traverse_read--;
2703 tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2709 a write style traverse - needs to get the transaction lock to
2712 int tdb_traverse(struct tdb_context *tdb,
2713 tdb_traverse_func fn, void *private_data)
2715 struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2718 if (tdb->read_only || tdb->traverse_read) {
2719 return tdb_traverse_read(tdb, fn, private_data);
2722 if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2723 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse: failed to get transaction lock\n"));
2724 tdb->ecode = TDB_ERR_LOCK;
2728 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2730 tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2736 /* find the first entry in the database and return its key */
2737 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2740 struct list_struct rec;
2742 /* release any old lock */
2743 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2745 tdb->travlocks.off = tdb->travlocks.hash = 0;
2746 tdb->travlocks.lock_rw = F_RDLCK;
2748 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2750 /* now read the key */
2751 key.dsize = rec.key_len;
2752 key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2753 if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
2754 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2758 /* find the next entry in the database, returning its key */
2759 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2762 TDB_DATA key = tdb_null;
2763 struct list_struct rec;
2764 unsigned char *k = NULL;
2766 /* Is locked key the old key? If so, traverse will be reliable. */
2767 if (tdb->travlocks.off) {
2768 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
2770 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2771 || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2773 || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2774 /* No, it wasn't: unlock it and start from scratch */
2775 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2779 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0) {
2783 tdb->travlocks.off = 0;
2789 if (!tdb->travlocks.off) {
2790 /* No previous element: do normal find, and lock record */
2791 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
2792 if (!tdb->travlocks.off)
2794 tdb->travlocks.hash = BUCKET(rec.full_hash);
2795 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2796 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2800 oldhash = tdb->travlocks.hash;
2802 /* Grab next record: locks chain and returned record,
2803 unlocks old record */
2804 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2805 key.dsize = rec.key_len;
2806 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2808 /* Unlock the chain of this new record */
2809 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
2810 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2812 /* Unlock the chain of old record */
2813 if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
2814 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2820 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, tdb_off_t offset)
2822 struct list_struct rec;
2823 tdb_off_t tailer_ofs, tailer;
2825 if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2826 sizeof(rec), DOCONV()) == -1) {
2827 printf("ERROR: failed to read record at %u\n", offset);
2831 printf(" rec: offset=0x%08x next=0x%08x rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2832 offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
2834 tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2836 if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2837 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2841 if (tailer != rec.rec_len + sizeof(rec)) {
2842 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2843 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2848 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2850 tdb_off_t rec_ptr, top;
2852 top = TDB_HASH_TOP(i);
2854 if (tdb_lock(tdb, i, F_WRLCK) != 0)
2857 if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2858 return tdb_unlock(tdb, i, F_WRLCK);
2861 printf("hash=%d\n", i);
2864 rec_ptr = tdb_dump_record(tdb, rec_ptr);
2867 return tdb_unlock(tdb, i, F_WRLCK);
2870 void tdb_dump_all(struct tdb_context *tdb)
2873 for (i=0;i<tdb->header.hash_size;i++) {
2874 tdb_dump_chain(tdb, i);
2876 printf("freelist:\n");
2877 tdb_dump_chain(tdb, -1);
2880 int tdb_printfreelist(struct tdb_context *tdb)
2883 long total_free = 0;
2884 tdb_off_t offset, rec_ptr;
2885 struct list_struct rec;
2887 if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
2890 offset = FREELIST_TOP;
2892 /* read in the freelist top */
2893 if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
2894 tdb_unlock(tdb, -1, F_WRLCK);
2898 printf("freelist top=[0x%08x]\n", rec_ptr );
2900 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
2901 sizeof(rec), DOCONV()) == -1) {
2902 tdb_unlock(tdb, -1, F_WRLCK);
2906 if (rec.magic != TDB_FREE_MAGIC) {
2907 printf("bad magic 0x%08x in free list\n", rec.magic);
2908 tdb_unlock(tdb, -1, F_WRLCK);
2912 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
2913 rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
2914 total_free += rec.rec_len;
2916 /* move to the next record */
2919 printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
2922 return tdb_unlock(tdb, -1, F_WRLCK);
2930 increment the tdb sequence number if the tdb has been opened using
2933 static void tdb_increment_seqnum(struct tdb_context *tdb)
2937 if (!(tdb->flags & TDB_SEQNUM)) {
2941 if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
2945 /* we ignore errors from this, as we have no sane way of
2948 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
2950 tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
2952 tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
2955 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
2957 return memcmp(data.dptr, key.dptr, data.dsize);
2960 /* Returns 0 on fail. On success, return offset of record, and fills
2962 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
2963 struct list_struct *r)
2967 /* read in the hash top */
2968 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
2971 /* keep looking until we find the right record */
2973 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
2976 if (!TDB_DEAD(r) && hash==r->full_hash
2977 && key.dsize==r->key_len
2978 && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
2979 r->key_len, tdb_key_compare,
2985 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
2988 /* As tdb_find, but if you succeed, keep the lock */
2989 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
2990 struct list_struct *rec)
2994 if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
2996 if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
2997 tdb_unlock(tdb, BUCKET(hash), locktype);
3002 /* update an entry in place - this only works if the new data size
3003 is <= the old data size and the key exists.
3004 on failure return -1.
3006 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3008 struct list_struct rec;
3012 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3015 /* must be long enough key, data and tailer */
3016 if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3017 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3021 if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3022 dbuf.dptr, dbuf.dsize) == -1)
3025 if (dbuf.dsize != rec.data_len) {
3027 rec.data_len = dbuf.dsize;
3028 return tdb_rec_write(tdb, rec_ptr, &rec);
3034 /* find an entry in the database given a key */
3035 /* If an entry doesn't exist tdb_err will be set to
3036 * TDB_ERR_NOEXIST. If a key has no data attached
3037 * then the TDB_DATA will have zero length but
3038 * a non-zero pointer
3040 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3043 struct list_struct rec;
3047 /* find which hash bucket it is in */
3048 hash = tdb->hash_fn(&key);
3049 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3052 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3054 ret.dsize = rec.data_len;
3055 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3060 * Find an entry in the database and hand the record's data to a parsing
3061 * function. The parsing function is executed under the chain read lock, so it
3062 * should be fast and should not block on other syscalls.
3064 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3066 * For mmapped tdb's that do not have a transaction open it points the parsing
3067 * function directly at the mmap area, it avoids the malloc/memcpy in this
3068 * case. If a transaction is open or no mmap is available, it has to do
3069 * malloc/read/parse/free.
3071 * This is interesting for all readers of potentially large data structures in
3072 * the tdb records, ldb indexes being one example.
3075 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3076 int (*parser)(TDB_DATA key, TDB_DATA data,
3077 void *private_data),
3081 struct list_struct rec;
3085 /* find which hash bucket it is in */
3086 hash = tdb->hash_fn(&key);
3088 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3089 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3092 ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3093 rec.data_len, parser, private_data);
3095 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3100 /* check if an entry in the database exists
3102 note that 1 is returned if the key is found and 0 is returned if not found
3103 this doesn't match the conventions in the rest of this module, but is
3104 compatible with gdbm
3106 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3108 struct list_struct rec;
3110 if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3112 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3116 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3118 u32 hash = tdb->hash_fn(&key);
3119 return tdb_exists_hash(tdb, key, hash);
3122 /* actually delete an entry in the database given the offset */
3123 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3125 tdb_off_t last_ptr, i;
3126 struct list_struct lastrec;
3128 if (tdb->read_only || tdb->traverse_read) return -1;
3130 if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3131 /* Someone traversing here: mark it as dead */
3132 rec->magic = TDB_DEAD_MAGIC;
3133 return tdb_rec_write(tdb, rec_ptr, rec);
3135 if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3138 /* find previous record in hash chain */
3139 if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3141 for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3142 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3145 /* unlink it: next ptr is at start of record. */
3147 last_ptr = TDB_HASH_TOP(rec->full_hash);
3148 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3151 /* recover the space */
3152 if (tdb_free(tdb, rec_ptr, rec) == -1)
3157 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3161 struct list_struct rec;
3163 /* read in the hash top */
3164 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3168 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3171 if (rec.magic == TDB_DEAD_MAGIC) {
3180 * Purge all DEAD records from a hash chain
3182 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3185 struct list_struct rec;
3188 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3192 /* read in the hash top */
3193 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3199 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3205 if (rec.magic == TDB_DEAD_MAGIC
3206 && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3213 tdb_unlock(tdb, -1, F_WRLCK);
3217 /* delete an entry in the database given a key */
3218 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3221 struct list_struct rec;
3224 if (tdb->max_dead_records != 0) {
3227 * Allow for some dead records per hash chain, mainly for
3228 * tdb's with a very high create/delete rate like locking.tdb.
3231 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3234 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3236 * Don't let the per-chain freelist grow too large,
3237 * delete all existing dead records
3239 tdb_purge_dead(tdb, hash);
3242 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3243 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3248 * Just mark the record as dead.
3250 rec.magic = TDB_DEAD_MAGIC;
3251 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3254 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3258 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3262 tdb_increment_seqnum(tdb);
3265 if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3266 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3270 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3272 u32 hash = tdb->hash_fn(&key);
3273 return tdb_delete_hash(tdb, key, hash);
3277 * See if we have a dead record around with enough space
3279 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3280 struct list_struct *r, tdb_len_t length)
3284 /* read in the hash top */
3285 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3288 /* keep looking until we find the right record */
3290 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3293 if (TDB_DEAD(r) && r->rec_len >= length) {
3295 * First fit for simple coding, TODO: change to best
3305 /* store an element in the database, replacing any existing element
3308 return 0 on success, -1 on failure
3310 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3312 struct list_struct rec;
3318 if (tdb->read_only || tdb->traverse_read) {
3319 tdb->ecode = TDB_ERR_RDONLY;
3323 /* find which hash bucket it is in */
3324 hash = tdb->hash_fn(&key);
3325 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3328 /* check for it existing, on insert. */
3329 if (flag == TDB_INSERT) {
3330 if (tdb_exists_hash(tdb, key, hash)) {
3331 tdb->ecode = TDB_ERR_EXISTS;
3335 /* first try in-place update, on modify or replace. */
3336 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3339 if (tdb->ecode == TDB_ERR_NOEXIST &&
3340 flag == TDB_MODIFY) {
3341 /* if the record doesn't exist and we are in TDB_MODIFY mode then
3342 we should fail the store */
3346 /* reset the error code potentially set by the tdb_update() */
3347 tdb->ecode = TDB_SUCCESS;
3349 /* delete any existing record - if it doesn't exist we don't
3350 care. Doing this first reduces fragmentation, and avoids
3351 coalescing with `allocated' block before it's updated. */
3352 if (flag != TDB_INSERT)
3353 tdb_delete_hash(tdb, key, hash);
3355 /* Copy key+value *before* allocating free space in case malloc
3356 fails and we are left with a dead spot in the tdb. */
3358 if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3359 tdb->ecode = TDB_ERR_OOM;
3363 memcpy(p, key.dptr, key.dsize);
3365 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3367 if (tdb->max_dead_records != 0) {
3369 * Allow for some dead records per hash chain, look if we can
3370 * find one that can hold the new record. We need enough space
3371 * for key, data and tailer. If we find one, we don't have to
3372 * consult the central freelist.
3374 rec_ptr = tdb_find_dead(
3376 key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3379 rec.key_len = key.dsize;
3380 rec.data_len = dbuf.dsize;
3381 rec.full_hash = hash;
3382 rec.magic = TDB_MAGIC;
3383 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3384 || tdb->methods->tdb_write(
3385 tdb, rec_ptr + sizeof(rec),
3386 p, key.dsize + dbuf.dsize) == -1) {
3394 * We have to allocate some space from the freelist, so this means we
3395 * have to lock it. Use the chance to purge all the DEAD records from
3396 * the hash chain under the freelist lock.
3399 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3403 if ((tdb->max_dead_records != 0)
3404 && (tdb_purge_dead(tdb, hash) == -1)) {
3405 tdb_unlock(tdb, -1, F_WRLCK);
3409 /* we have to allocate some space */
3410 rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3412 tdb_unlock(tdb, -1, F_WRLCK);
3418 /* Read hash top into next ptr */
3419 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3422 rec.key_len = key.dsize;
3423 rec.data_len = dbuf.dsize;
3424 rec.full_hash = hash;
3425 rec.magic = TDB_MAGIC;
3427 /* write out and point the top of the hash chain at it */
3428 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3429 || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3430 || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3431 /* Need to tdb_unallocate() here */
3439 tdb_increment_seqnum(tdb);
3443 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3448 /* Append to an entry. Create if not exist. */
3449 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3455 /* find which hash bucket it is in */
3456 hash = tdb->hash_fn(&key);
3457 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3460 dbuf = tdb_fetch(tdb, key);
3462 if (dbuf.dptr == NULL) {
3463 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3465 dbuf.dptr = (unsigned char *)realloc(dbuf.dptr,
3466 dbuf.dsize + new_dbuf.dsize);
3469 if (dbuf.dptr == NULL) {
3470 tdb->ecode = TDB_ERR_OOM;
3474 memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3475 dbuf.dsize += new_dbuf.dsize;
3477 ret = tdb_store(tdb, key, dbuf, 0);
3480 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3481 SAFE_FREE(dbuf.dptr);
3487 return the name of the current tdb file
3488 useful for external logging functions
3490 const char *tdb_name(struct tdb_context *tdb)
3496 return the underlying file descriptor being used by tdb, or -1
3497 useful for external routines that want to check the device/inode
3500 int tdb_fd(struct tdb_context *tdb)
3506 return the current logging function
3507 useful for external tdb routines that wish to log tdb errors
3509 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3511 return tdb->log.log_fn;
3516 get the tdb sequence number. Only makes sense if the writers opened
3517 with TDB_SEQNUM set. Note that this sequence number will wrap quite
3518 quickly, so it should only be used for a 'has something changed'
3519 test, not for code that relies on the count of the number of changes
3520 made. If you want a counter then use a tdb record.
3522 The aim of this sequence number is to allow for a very lightweight
3523 test of a possible tdb change.
3525 int tdb_get_seqnum(struct tdb_context *tdb)
3529 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3533 int tdb_hash_size(struct tdb_context *tdb)
3535 return tdb->header.hash_size;
3538 size_t tdb_map_size(struct tdb_context *tdb)
3540 return tdb->map_size;
3543 int tdb_get_flags(struct tdb_context *tdb)
3550 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3551 static struct tdb_context *tdbs = NULL;
3554 /* This is based on the hash algorithm from gdbm */
3555 static unsigned int default_tdb_hash(TDB_DATA *key)
3557 u32 value; /* Used to compute the hash value. */
3558 u32 i; /* Used to cycle through random values. */
3560 /* Set the initial value from the key size. */
3561 for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
3562 value = (value + (key->dptr[i] << (i*5 % 24)));
3564 return (1103515243 * value + 12345);
3568 /* initialise a new database with a specified hash size */
3569 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3571 struct tdb_header *newdb;
3574 /* We make it up in memory, then write it out if not internal */
3575 size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3576 if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3577 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3579 /* Fill in the header */
3580 newdb->version = TDB_VERSION;
3581 newdb->hash_size = hash_size;
3582 if (tdb->flags & TDB_INTERNAL) {
3583 tdb->map_size = size;
3584 tdb->map_ptr = (char *)newdb;
3585 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3586 /* Convert the `ondisk' version if asked. */
3590 if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3593 if (ftruncate(tdb->fd, 0) == -1)
3596 /* This creates an endian-converted header, as if read from disk */
3598 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3599 /* Don't endian-convert the magic food! */
3600 memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3601 if (write(tdb->fd, newdb, size) != size) {
3614 static int tdb_already_open(dev_t device,
3617 struct tdb_context *i;
3619 for (i = tdbs; i; i = i->next) {
3620 if (i->device == device && i->inode == ino) {
3628 /* open the database, creating it if necessary
3630 The open_flags and mode are passed straight to the open call on the
3631 database file. A flags value of O_WRONLY is invalid. The hash size
3632 is advisory, use zero for a default value.
3634 Return is NULL on error, in which case errno is also set. Don't
3635 try to call tdb_error or tdb_errname, just do strerror(errno).
3637 @param name may be NULL for internal databases. */
3638 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3639 int open_flags, mode_t mode)
3641 return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3644 /* a default logging function */
3645 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3646 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3651 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3652 int open_flags, mode_t mode,
3653 const struct tdb_logging_context *log_ctx,
3654 tdb_hash_func hash_fn)
3656 struct tdb_context *tdb;
3658 int rev = 0, locked = 0;
3662 if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3663 /* Can't log this */
3670 tdb->map_ptr = NULL;
3671 tdb->flags = tdb_flags;
3672 tdb->open_flags = open_flags;
3674 tdb->log = *log_ctx;
3676 tdb->log.log_fn = null_log_fn;
3677 tdb->log.log_private = NULL;
3679 tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3681 /* cache the page size */
3682 tdb->page_size = getpagesize();
3683 if (tdb->page_size <= 0) {
3684 tdb->page_size = 0x2000;
3687 if ((open_flags & O_ACCMODE) == O_WRONLY) {
3688 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3695 hash_size = DEFAULT_HASH_SIZE;
3696 if ((open_flags & O_ACCMODE) == O_RDONLY) {
3698 /* read only databases don't do locking or clear if first */
3699 tdb->flags |= TDB_NOLOCK;
3700 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3703 /* internal databases don't mmap or lock, and start off cleared */
3704 if (tdb->flags & TDB_INTERNAL) {
3705 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3706 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3707 if (tdb_new_database(tdb, hash_size) != 0) {
3708 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3714 if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3715 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3716 name, strerror(errno)));
3717 goto fail; /* errno set by open(2) */
3720 /* ensure there is only one process initialising at once */
3721 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3722 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3723 name, strerror(errno)));
3724 goto fail; /* errno set by tdb_brlock */
3727 /* we need to zero database if we are the only one with it open */
3728 if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3729 (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3730 open_flags |= O_CREAT;
3731 if (ftruncate(tdb->fd, 0) == -1) {
3732 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3733 "failed to truncate %s: %s\n",
3734 name, strerror(errno)));
3735 goto fail; /* errno set by ftruncate */
3739 if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3740 || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3741 || (tdb->header.version != TDB_VERSION
3742 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3743 /* its not a valid database - possibly initialise it */
3744 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3745 errno = EIO; /* ie bad format or something */
3748 rev = (tdb->flags & TDB_CONVERT);
3750 vp = (unsigned char *)&tdb->header.version;
3751 vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3752 (((u32)vp[2]) << 8) | (u32)vp[3];
3753 tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3755 tdb->flags &= ~TDB_CONVERT;
3757 tdb->flags |= TDB_CONVERT;
3758 tdb_convert(&tdb->header, sizeof(tdb->header));
3760 if (fstat(tdb->fd, &st) == -1)
3763 if (tdb->header.rwlocks != 0) {
3764 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3768 /* Is it already in the open list? If so, fail. */
3769 if (tdb_already_open(st.st_dev, st.st_ino)) {
3770 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3771 "%s (%d,%d) is already open in this process\n",
3772 name, (int)st.st_dev, (int)st.st_ino));
3777 if (!(tdb->name = (char *)strdup(name))) {
3782 tdb->map_size = st.st_size;
3783 tdb->device = st.st_dev;
3784 tdb->inode = st.st_ino;
3785 tdb->max_dead_records = 0;
3788 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3789 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3790 "failed to take ACTIVE_LOCK on %s: %s\n",
3791 name, strerror(errno)));
3797 /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3798 we didn't get the initial exclusive lock as we need to let all other
3799 users know we're using it. */
3801 if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3802 /* leave this lock in place to indicate it's in use */
3803 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3807 /* if needed, run recovery */
3808 if (tdb_transaction_recover(tdb) == -1) {
3813 /* Internal (memory-only) databases skip all the code above to
3814 * do with disk files, and resume here by releasing their
3815 * global lock and hooking into the active list. */
3816 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3823 { int save_errno = errno;
3829 if (tdb->flags & TDB_INTERNAL)
3830 SAFE_FREE(tdb->map_ptr);
3834 SAFE_FREE(tdb->name);
3836 if (close(tdb->fd) != 0)
3837 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3845 * Set the maximum number of dead records per hash chain
3848 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
3850 tdb->max_dead_records = max_dead;
3856 * @returns -1 for error; 0 for success.
3858 int tdb_close(struct tdb_context *tdb)
3860 struct tdb_context **i;
3863 if (tdb->transaction) {
3864 tdb_transaction_cancel(tdb);
3868 if (tdb->flags & TDB_INTERNAL)
3869 SAFE_FREE(tdb->map_ptr);
3873 SAFE_FREE(tdb->name);
3875 ret = close(tdb->fd);
3876 SAFE_FREE(tdb->lockrecs);
3878 /* Remove from contexts list */
3879 for (i = &tdbs; *i; i = &(*i)->next) {
3886 memset(tdb, 0, sizeof(*tdb));
3892 /* register a loging function */
3893 void tdb_set_logging_function(struct tdb_context *tdb,
3894 const struct tdb_logging_context *log_ctx)
3896 tdb->log = *log_ctx;
3899 void *tdb_get_logging_private(struct tdb_context *tdb)
3901 return tdb->log.log_private;
3904 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
3905 seek pointer from our parent and to re-establish locks */
3906 int tdb_reopen(struct tdb_context *tdb)
3910 if (tdb->flags & TDB_INTERNAL) {
3911 return 0; /* Nothing to do. */
3914 if (tdb->num_locks != 0 || tdb->global_lock.count) {
3915 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
3919 if (tdb->transaction != 0) {
3920 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
3924 if (tdb_munmap(tdb) != 0) {
3925 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
3928 if (close(tdb->fd) != 0)
3929 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
3930 tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
3931 if (tdb->fd == -1) {
3932 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
3935 if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
3936 (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
3937 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
3940 if (fstat(tdb->fd, &st) != 0) {
3941 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
3944 if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
3945 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
3957 /* reopen all tdb's */
3958 int tdb_reopen_all(int parent_longlived)
3960 struct tdb_context *tdb;
3962 for (tdb=tdbs; tdb; tdb = tdb->next) {
3964 * If the parent is longlived (ie. a
3965 * parent daemon architecture), we know
3966 * it will keep it's active lock on a
3967 * tdb opened with CLEAR_IF_FIRST. Thus
3968 * for child processes we don't have to
3969 * add an active lock. This is essential
3970 * to improve performance on systems that
3971 * keep POSIX locks as a non-scalable data
3972 * structure in the kernel.
3974 if (parent_longlived) {
3975 /* Ensure no clear-if-first. */
3976 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3979 if (tdb_reopen(tdb) != 0)