Whamcloud - gitweb
Add portability checks to support DJGPP
[tools/e2fsprogs.git] / lib / ext2fs / tdb.c
1 /* 
2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb
3 Rev: 22080
4 Last Changed: 2007-04-03 05:08:18 -0400
5 */
6  /* 
7    trivial database library - standalone version
8
9    Copyright (C) Andrew Tridgell              1999-2005
10    Copyright (C) Jeremy Allison               2000-2006
11    Copyright (C) Paul `Rusty' Russell         2000
12    
13      ** NOTE! The following LGPL license applies to the tdb
14      ** library. This does NOT imply that all of Samba is released
15      ** under the LGPL
16    
17    This library is free software; you can redistribute it and/or
18    modify it under the terms of the GNU Lesser General Public
19    License as published by the Free Software Foundation; either
20    version 2 of the License, or (at your option) any later version.
21
22    This library is distributed in the hope that it will be useful,
23    but WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25    Lesser General Public License for more details.
26
27    You should have received a copy of the GNU Lesser General Public
28    License along with this library; if not, write to the Free Software
29    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30 */
31
32 #ifdef CONFIG_STAND_ALONE
33 #define HAVE_MMAP
34 #define HAVE_STRDUP
35 #define HAVE_SYS_MMAN_H
36 #define HAVE_UTIME_H
37 #define HAVE_UTIME
38 #endif
39 #define _XOPEN_SOURCE 500
40
41 #include <unistd.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <stdarg.h>
45 #include <stddef.h>
46 #include <errno.h>
47 #include <string.h>
48 #ifdef HAVE_SYS_SELECT_H
49 #include <sys/select.h>
50 #endif
51 #include <sys/time.h>
52 #include <sys/types.h>
53 #include <time.h>
54 #ifdef HAVE_UTIME_H
55 #include <utime.h>
56 #endif
57 #include <sys/stat.h>
58 #include <sys/file.h>
59 #include <fcntl.h>
60
61 #ifdef HAVE_SYS_MMAN_H
62 #include <sys/mman.h>
63 #endif
64
65 #ifndef MAP_FILE
66 #define MAP_FILE 0
67 #endif
68
69 #ifndef MAP_FAILED
70 #define MAP_FAILED ((void *)-1)
71 #endif
72
73 #ifndef HAVE_STRDUP
74 #define strdup rep_strdup
75 static char *rep_strdup(const char *s)
76 {
77         char *ret;
78         int length;
79         if (!s)
80                 return NULL;
81
82         if (!length)
83                 length = strlen(s);
84
85         ret = malloc(length + 1);
86         if (ret) {
87                 strncpy(ret, s, length);
88                 ret[length] = '\0';
89         }
90         return ret;
91 }
92 #endif
93
94 #ifndef PRINTF_ATTRIBUTE
95 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
96 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
97  * the parameter containing the format, and a2 the index of the first
98  * argument. Note that some gcc 2.x versions don't handle this
99  * properly **/
100 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
101 #else
102 #define PRINTF_ATTRIBUTE(a1, a2)
103 #endif
104 #endif
105
106 #include "tdb.h"
107
108 #ifndef u32
109 #define u32 unsigned
110 #endif
111
112 #ifndef HAVE_GETPAGESIZE
113 #define getpagesize() 0x2000
114 #endif
115
116 typedef u32 tdb_len_t;
117 typedef u32 tdb_off_t;
118
119 #ifndef offsetof
120 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
121 #endif
122
123 #define TDB_MAGIC_FOOD "TDB file\n"
124 #define TDB_VERSION (0x26011967 + 6)
125 #define TDB_MAGIC (0x26011999U)
126 #define TDB_FREE_MAGIC (~TDB_MAGIC)
127 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
128 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
129 #define TDB_ALIGNMENT 4
130 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
131 #define DEFAULT_HASH_SIZE 131
132 #define FREELIST_TOP (sizeof(struct tdb_header))
133 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
134 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
135 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
136 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
137 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
138 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
139 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
140 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
141 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
142 #define TDB_PAD_BYTE 0x42
143 #define TDB_PAD_U32  0x42424242
144
145 /* NB assumes there is a local variable called "tdb" that is the
146  * current context, also takes doubly-parenthesized print-style
147  * argument. */
148 #define TDB_LOG(x) tdb->log.log_fn x
149
150 /* lock offsets */
151 #define GLOBAL_LOCK      0
152 #define ACTIVE_LOCK      4
153 #define TRANSACTION_LOCK 8
154
155 /* free memory if the pointer is valid and zero the pointer */
156 #ifndef SAFE_FREE
157 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
158 #endif
159
160 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
161
162 #define DOCONV() (tdb->flags & TDB_CONVERT)
163 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
164
165
166 /* the body of the database is made of one list_struct for the free space
167    plus a separate data list for each hash value */
168 struct list_struct {
169         tdb_off_t next; /* offset of the next record in the list */
170         tdb_len_t rec_len; /* total byte length of record */
171         tdb_len_t key_len; /* byte length of key */
172         tdb_len_t data_len; /* byte length of data */
173         u32 full_hash; /* the full 32 bit hash of the key */
174         u32 magic;   /* try to catch errors */
175         /* the following union is implied:
176                 union {
177                         char record[rec_len];
178                         struct {
179                                 char key[key_len];
180                                 char data[data_len];
181                         }
182                         u32 totalsize; (tailer)
183                 }
184         */
185 };
186
187
188 /* this is stored at the front of every database */
189 struct tdb_header {
190         char magic_food[32]; /* for /etc/magic */
191         u32 version; /* version of the code */
192         u32 hash_size; /* number of hash entries */
193         tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
194         tdb_off_t recovery_start; /* offset of transaction recovery region */
195         tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
196         tdb_off_t reserved[29];
197 };
198
199 struct tdb_lock_type {
200         int list;
201         u32 count;
202         u32 ltype;
203 };
204
205 struct tdb_traverse_lock {
206         struct tdb_traverse_lock *next;
207         u32 off;
208         u32 hash;
209         int lock_rw;
210 };
211
212
213 struct tdb_methods {
214         int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
215         int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
216         void (*next_hash_chain)(struct tdb_context *, u32 *);
217         int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
218         int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
219         int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
220 };
221
222 struct tdb_context {
223         char *name; /* the name of the database */
224         void *map_ptr; /* where it is currently mapped */
225         int fd; /* open file descriptor for the database */
226         tdb_len_t map_size; /* how much space has been mapped */
227         int read_only; /* opened read-only */
228         int traverse_read; /* read-only traversal */
229         struct tdb_lock_type global_lock;
230         int num_lockrecs;
231         struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
232         enum TDB_ERROR ecode; /* error code for last tdb error */
233         struct tdb_header header; /* a cached copy of the header */
234         u32 flags; /* the flags passed to tdb_open */
235         struct tdb_traverse_lock travlocks; /* current traversal locks */
236         struct tdb_context *next; /* all tdbs to avoid multiple opens */
237         dev_t device;   /* uniquely identifies this tdb */
238         ino_t inode;    /* uniquely identifies this tdb */
239         struct tdb_logging_context log;
240         unsigned int (*hash_fn)(TDB_DATA *key);
241         int open_flags; /* flags used in the open - needed by reopen */
242         unsigned int num_locks; /* number of chain locks held */
243         const struct tdb_methods *methods;
244         struct tdb_transaction *transaction;
245         int page_size;
246         int max_dead_records;
247 };
248
249
250 /*
251   internal prototypes
252 */
253 static int tdb_munmap(struct tdb_context *tdb);
254 static void tdb_mmap(struct tdb_context *tdb);
255 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
256 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
257 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
258 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
259 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
260 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
261 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
262 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
263 static void *tdb_convert(void *buf, u32 size);
264 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
265 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
266 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
268 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
269 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
270 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
271 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
272 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
273 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
274 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
275                    tdb_off_t offset, tdb_len_t len,
276                    int (*parser)(TDB_DATA key, TDB_DATA data,
277                                  void *private_data),
278                    void *private_data);
279 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
280                            struct list_struct *rec);
281 static void tdb_io_init(struct tdb_context *tdb);
282 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
283
284
285 /* file: error.c */
286
287 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
288 {
289         return tdb->ecode;
290 }
291
292 static struct tdb_errname {
293         enum TDB_ERROR ecode; const char *estring;
294 } emap[] = { {TDB_SUCCESS, "Success"},
295              {TDB_ERR_CORRUPT, "Corrupt database"},
296              {TDB_ERR_IO, "IO Error"},
297              {TDB_ERR_LOCK, "Locking error"},
298              {TDB_ERR_OOM, "Out of memory"},
299              {TDB_ERR_EXISTS, "Record exists"},
300              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
301              {TDB_ERR_EINVAL, "Invalid parameter"},
302              {TDB_ERR_NOEXIST, "Record does not exist"},
303              {TDB_ERR_RDONLY, "write not permitted"} };
304
305 /* Error string for the last tdb error */
306 const char *tdb_errorstr(struct tdb_context *tdb)
307 {
308         u32 i;
309         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
310                 if (tdb->ecode == emap[i].ecode)
311                         return emap[i].estring;
312         return "Invalid error code";
313 }
314
315 /* file: lock.c */
316
317 /* a byte range locking function - return 0 on success
318    this functions locks/unlocks 1 byte at the specified offset.
319
320    On error, errno is also set so that errors are passed back properly
321    through tdb_open(). 
322
323    note that a len of zero means lock to end of file
324 */
325 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
326                int rw_type, int lck_type, int probe, size_t len)
327 {
328         struct flock fl;
329         int ret;
330
331         if (tdb->flags & TDB_NOLOCK) {
332                 return 0;
333         }
334
335         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
336                 tdb->ecode = TDB_ERR_RDONLY;
337                 return -1;
338         }
339
340         fl.l_type = rw_type;
341         fl.l_whence = SEEK_SET;
342         fl.l_start = offset;
343         fl.l_len = len;
344         fl.l_pid = 0;
345
346         do {
347                 ret = fcntl(tdb->fd,lck_type,&fl);
348         } while (ret == -1 && errno == EINTR);
349
350         if (ret == -1) {
351                 /* Generic lock error. errno set by fcntl.
352                  * EAGAIN is an expected return from non-blocking
353                  * locks. */
354                 if (!probe && lck_type != F_SETLK) {
355                         /* Ensure error code is set for log fun to examine. */
356                         tdb->ecode = TDB_ERR_LOCK;
357                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 
358                                  tdb->fd, offset, rw_type, lck_type, (int)len));
359                 }
360                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
361         }
362         return 0;
363 }
364
365
366 /*
367   upgrade a read lock to a write lock. This needs to be handled in a
368   special way as some OSes (such as solaris) have too conservative
369   deadlock detection and claim a deadlock when progress can be
370   made. For those OSes we may loop for a while.  
371 */
372 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
373 {
374         int count = 1000;
375         while (count--) {
376                 struct timeval tv;
377                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
378                         return 0;
379                 }
380                 if (errno != EDEADLK) {
381                         break;
382                 }
383                 /* sleep for as short a time as we can - more portable than usleep() */
384                 tv.tv_sec = 0;
385                 tv.tv_usec = 1;
386                 select(0, NULL, NULL, NULL, &tv);
387         }
388         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
389         return -1;
390 }
391
392
393 /* lock a list in the database. list -1 is the alloc list */
394 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
395 {
396         struct tdb_lock_type *new_lck;
397         int i;
398
399         /* a global lock allows us to avoid per chain locks */
400         if (tdb->global_lock.count && 
401             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
402                 return 0;
403         }
404
405         if (tdb->global_lock.count) {
406                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
407         }
408
409         if (list < -1 || list >= (int)tdb->header.hash_size) {
410                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n", 
411                            list, ltype));
412                 return -1;
413         }
414         if (tdb->flags & TDB_NOLOCK)
415                 return 0;
416
417         for (i=0; i<tdb->num_lockrecs; i++) {
418                 if (tdb->lockrecs[i].list == list) {
419                         if (tdb->lockrecs[i].count == 0) {
420                                 /*
421                                  * Can't happen, see tdb_unlock(). It should
422                                  * be an assert.
423                                  */
424                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
425                                          "lck->count == 0 for list %d", list));
426                         }
427                         /*
428                          * Just increment the in-memory struct, posix locks
429                          * don't stack.
430                          */
431                         tdb->lockrecs[i].count++;
432                         return 0;
433                 }
434         }
435
436         new_lck = (struct tdb_lock_type *)realloc(
437                 tdb->lockrecs,
438                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
439         if (new_lck == NULL) {
440                 errno = ENOMEM;
441                 return -1;
442         }
443         tdb->lockrecs = new_lck;
444
445         /* Since fcntl locks don't nest, we do a lock for the first one,
446            and simply bump the count for future ones */
447         if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW,
448                                      0, 1)) {
449                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
450                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
451                 return -1;
452         }
453
454         tdb->num_locks++;
455
456         tdb->lockrecs[tdb->num_lockrecs].list = list;
457         tdb->lockrecs[tdb->num_lockrecs].count = 1;
458         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
459         tdb->num_lockrecs += 1;
460
461         return 0;
462 }
463
464 /* unlock the database: returns void because it's too late for errors. */
465         /* changed to return int it may be interesting to know there
466            has been an error  --simo */
467 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
468 {
469         int ret = -1;
470         int i;
471         struct tdb_lock_type *lck = NULL;
472
473         /* a global lock allows us to avoid per chain locks */
474         if (tdb->global_lock.count && 
475             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
476                 return 0;
477         }
478
479         if (tdb->global_lock.count) {
480                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
481         }
482
483         if (tdb->flags & TDB_NOLOCK)
484                 return 0;
485
486         /* Sanity checks */
487         if (list < -1 || list >= (int)tdb->header.hash_size) {
488                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
489                 return ret;
490         }
491
492         for (i=0; i<tdb->num_lockrecs; i++) {
493                 if (tdb->lockrecs[i].list == list) {
494                         lck = &tdb->lockrecs[i];
495                         break;
496                 }
497         }
498
499         if ((lck == NULL) || (lck->count == 0)) {
500                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
501                 return -1;
502         }
503
504         if (lck->count > 1) {
505                 lck->count--;
506                 return 0;
507         }
508
509         /*
510          * This lock has count==1 left, so we need to unlock it in the
511          * kernel. We don't bother with decrementing the in-memory array
512          * element, we're about to overwrite it with the last array element
513          * anyway.
514          */
515
516         ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
517                                        F_SETLKW, 0, 1);
518         tdb->num_locks--;
519
520         /*
521          * Shrink the array by overwriting the element just unlocked with the
522          * last array element.
523          */
524
525         if (tdb->num_lockrecs > 1) {
526                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
527         }
528         tdb->num_lockrecs -= 1;
529
530         /*
531          * We don't bother with realloc when the array shrinks, but if we have
532          * a completely idle tdb we should get rid of the locked array.
533          */
534
535         if (tdb->num_lockrecs == 0) {
536                 SAFE_FREE(tdb->lockrecs);
537         }
538
539         if (ret)
540                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n")); 
541         return ret;
542 }
543
544
545
546 /* lock/unlock entire database */
547 static int _tdb_lockall(struct tdb_context *tdb, int ltype)
548 {
549         /* There are no locks on read-only dbs */
550         if (tdb->read_only || tdb->traverse_read)
551                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
552
553         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
554                 tdb->global_lock.count++;
555                 return 0;
556         }
557
558         if (tdb->global_lock.count) {
559                 /* a global lock of a different type exists */
560                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
561         }
562         
563         if (tdb->num_locks != 0) {
564                 /* can't combine global and chain locks */
565                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
566         }
567
568         if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, F_SETLKW, 
569                                      0, 4*tdb->header.hash_size)) {
570                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
571                 return -1;
572         }
573
574         tdb->global_lock.count = 1;
575         tdb->global_lock.ltype = ltype;
576
577         return 0;
578 }
579
580 /* unlock entire db */
581 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
582 {
583         /* There are no locks on read-only dbs */
584         if (tdb->read_only || tdb->traverse_read) {
585                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
586         }
587
588         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
589                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
590         }
591
592         if (tdb->global_lock.count > 1) {
593                 tdb->global_lock.count--;
594                 return 0;
595         }
596
597         if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 
598                                      0, 4*tdb->header.hash_size)) {
599                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
600                 return -1;
601         }
602
603         tdb->global_lock.count = 0;
604         tdb->global_lock.ltype = 0;
605
606         return 0;
607 }
608
609 /* lock entire database with write lock */
610 int tdb_lockall(struct tdb_context *tdb)
611 {
612         return _tdb_lockall(tdb, F_WRLCK);
613 }
614
615 /* unlock entire database with write lock */
616 int tdb_unlockall(struct tdb_context *tdb)
617 {
618         return _tdb_unlockall(tdb, F_WRLCK);
619 }
620
621 /* lock entire database with read lock */
622 int tdb_lockall_read(struct tdb_context *tdb)
623 {
624         return _tdb_lockall(tdb, F_RDLCK);
625 }
626
627 /* unlock entire database with read lock */
628 int tdb_unlockall_read(struct tdb_context *tdb)
629 {
630         return _tdb_unlockall(tdb, F_RDLCK);
631 }
632
633 /* lock/unlock one hash chain. This is meant to be used to reduce
634    contention - it cannot guarantee how many records will be locked */
635 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
636 {
637         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
638 }
639
640 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
641 {
642         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
643 }
644
645 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
646 {
647         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
648 }
649
650 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
651 {
652         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
653 }
654
655
656
657 /* record lock stops delete underneath */
658 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
659 {
660         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
661 }
662
663 /*
664   Write locks override our own fcntl readlocks, so check it here.
665   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
666   an error to fail to get the lock here.
667 */
668 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
669 {
670         struct tdb_traverse_lock *i;
671         for (i = &tdb->travlocks; i; i = i->next)
672                 if (i->off == off)
673                         return -1;
674         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
675 }
676
677 /*
678   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
679   an error to fail to get the lock here.
680 */
681 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
682 {
683         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
684 }
685
686 /* fcntl locks don't stack: avoid unlocking someone else's */
687 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
688 {
689         struct tdb_traverse_lock *i;
690         u32 count = 0;
691
692         if (off == 0)
693                 return 0;
694         for (i = &tdb->travlocks; i; i = i->next)
695                 if (i->off == off)
696                         count++;
697         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
698 }
699
700 /* file: io.c */
701
702 /* check for an out of bounds access - if it is out of bounds then
703    see if the database has been expanded by someone else and expand
704    if necessary 
705    note that "len" is the minimum length needed for the db
706 */
707 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
708 {
709         struct stat st;
710         if (len <= tdb->map_size)
711                 return 0;
712         if (tdb->flags & TDB_INTERNAL) {
713                 if (!probe) {
714                         /* Ensure ecode is set for log fn. */
715                         tdb->ecode = TDB_ERR_IO;
716                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
717                                  (int)len, (int)tdb->map_size));
718                 }
719                 return TDB_ERRCODE(TDB_ERR_IO, -1);
720         }
721
722         if (fstat(tdb->fd, &st) == -1) {
723                 return TDB_ERRCODE(TDB_ERR_IO, -1);
724         }
725
726         if (st.st_size < (size_t)len) {
727                 if (!probe) {
728                         /* Ensure ecode is set for log fn. */
729                         tdb->ecode = TDB_ERR_IO;
730                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
731                                  (int)len, (int)st.st_size));
732                 }
733                 return TDB_ERRCODE(TDB_ERR_IO, -1);
734         }
735
736         /* Unmap, update size, remap */
737         if (tdb_munmap(tdb) == -1)
738                 return TDB_ERRCODE(TDB_ERR_IO, -1);
739         tdb->map_size = st.st_size;
740         tdb_mmap(tdb);
741         return 0;
742 }
743
744 /* write a lump of data at a specified offset */
745 static int tdb_write(struct tdb_context *tdb, tdb_off_t off, 
746                      const void *buf, tdb_len_t len)
747 {
748         if (len == 0) {
749                 return 0;
750         }
751
752         if (tdb->read_only || tdb->traverse_read) {
753                 tdb->ecode = TDB_ERR_RDONLY;
754                 return -1;
755         }
756
757         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
758                 return -1;
759
760         if (tdb->map_ptr) {
761                 memcpy(off + (char *)tdb->map_ptr, buf, len);
762         } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
763                 /* Ensure ecode is set for log fn. */
764                 tdb->ecode = TDB_ERR_IO;
765                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
766                            off, len, strerror(errno)));
767                 return TDB_ERRCODE(TDB_ERR_IO, -1);
768         }
769         return 0;
770 }
771
772 /* Endian conversion: we only ever deal with 4 byte quantities */
773 void *tdb_convert(void *buf, u32 size)
774 {
775         u32 i, *p = (u32 *)buf;
776         for (i = 0; i < size / 4; i++)
777                 p[i] = TDB_BYTEREV(p[i]);
778         return buf;
779 }
780
781
782 /* read a lump of data at a specified offset, maybe convert */
783 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
784                     tdb_len_t len, int cv)
785 {
786         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
787                 return -1;
788         }
789
790         if (tdb->map_ptr) {
791                 memcpy(buf, off + (char *)tdb->map_ptr, len);
792         } else {
793                 ssize_t ret = pread(tdb->fd, buf, len, off);
794                 if (ret != (ssize_t)len) {
795                         /* Ensure ecode is set for log fn. */
796                         tdb->ecode = TDB_ERR_IO;
797                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
798                                  "len=%d ret=%d (%s) map_size=%d\n",
799                                  (int)off, (int)len, (int)ret, strerror(errno),
800                                  (int)tdb->map_size));
801                         return TDB_ERRCODE(TDB_ERR_IO, -1);
802                 }
803         }
804         if (cv) {
805                 tdb_convert(buf, len);
806         }
807         return 0;
808 }
809
810
811
812 /*
813   do an unlocked scan of the hash table heads to find the next non-zero head. The value
814   will then be confirmed with the lock held
815 */              
816 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
817 {
818         u32 h = *chain;
819         if (tdb->map_ptr) {
820                 for (;h < tdb->header.hash_size;h++) {
821                         if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
822                                 break;
823                         }
824                 }
825         } else {
826                 u32 off=0;
827                 for (;h < tdb->header.hash_size;h++) {
828                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
829                                 break;
830                         }
831                 }
832         }
833         (*chain) = h;
834 }
835
836
837 int tdb_munmap(struct tdb_context *tdb)
838 {
839         if (tdb->flags & TDB_INTERNAL)
840                 return 0;
841
842 #ifdef HAVE_MMAP
843         if (tdb->map_ptr) {
844                 int ret = munmap(tdb->map_ptr, tdb->map_size);
845                 if (ret != 0)
846                         return ret;
847         }
848 #endif
849         tdb->map_ptr = NULL;
850         return 0;
851 }
852
853 void tdb_mmap(struct tdb_context *tdb)
854 {
855         if (tdb->flags & TDB_INTERNAL)
856                 return;
857
858 #ifdef HAVE_MMAP
859         if (!(tdb->flags & TDB_NOMMAP)) {
860                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
861                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
862                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
863
864                 /*
865                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
866                  */
867
868                 if (tdb->map_ptr == MAP_FAILED) {
869                         tdb->map_ptr = NULL;
870                         TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n", 
871                                  tdb->map_size, strerror(errno)));
872                 }
873         } else {
874                 tdb->map_ptr = NULL;
875         }
876 #else
877         tdb->map_ptr = NULL;
878 #endif
879 }
880
881 /* expand a file.  we prefer to use ftruncate, as that is what posix
882   says to use for mmap expansion */
883 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
884 {
885         char buf[1024];
886
887         if (tdb->read_only || tdb->traverse_read) {
888                 tdb->ecode = TDB_ERR_RDONLY;
889                 return -1;
890         }
891
892         if (ftruncate(tdb->fd, size+addition) == -1) {
893                 char b = 0;
894                 if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
895                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n", 
896                                  size+addition, strerror(errno)));
897                         return -1;
898                 }
899         }
900
901         /* now fill the file with something. This ensures that the
902            file isn't sparse, which would be very bad if we ran out of
903            disk. This must be done with write, not via mmap */
904         memset(buf, TDB_PAD_BYTE, sizeof(buf));
905         while (addition) {
906                 int n = addition>sizeof(buf)?sizeof(buf):addition;
907                 int ret = pwrite(tdb->fd, buf, n, size);
908                 if (ret != n) {
909                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n", 
910                                    n, strerror(errno)));
911                         return -1;
912                 }
913                 addition -= n;
914                 size += n;
915         }
916         return 0;
917 }
918
919
920 /* expand the database at least size bytes by expanding the underlying
921    file and doing the mmap again if necessary */
922 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
923 {
924         struct list_struct rec;
925         tdb_off_t offset;
926
927         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
928                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
929                 return -1;
930         }
931
932         /* must know about any previous expansions by another process */
933         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
934
935         /* always make room for at least 10 more records, and round
936            the database up to a multiple of the page size */
937         size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
938
939         if (!(tdb->flags & TDB_INTERNAL))
940                 tdb_munmap(tdb);
941
942         /*
943          * We must ensure the file is unmapped before doing this
944          * to ensure consistency with systems like OpenBSD where
945          * writes and mmaps are not consistent.
946          */
947
948         /* expand the file itself */
949         if (!(tdb->flags & TDB_INTERNAL)) {
950                 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
951                         goto fail;
952         }
953
954         tdb->map_size += size;
955
956         if (tdb->flags & TDB_INTERNAL) {
957                 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
958                                                     tdb->map_size);
959                 if (!new_map_ptr) {
960                         tdb->map_size -= size;
961                         goto fail;
962                 }
963                 tdb->map_ptr = new_map_ptr;
964         } else {
965                 /*
966                  * We must ensure the file is remapped before adding the space
967                  * to ensure consistency with systems like OpenBSD where
968                  * writes and mmaps are not consistent.
969                  */
970
971                 /* We're ok if the mmap fails as we'll fallback to read/write */
972                 tdb_mmap(tdb);
973         }
974
975         /* form a new freelist record */
976         memset(&rec,'\0',sizeof(rec));
977         rec.rec_len = size - sizeof(rec);
978
979         /* link it into the free list */
980         offset = tdb->map_size - size;
981         if (tdb_free(tdb, offset, &rec) == -1)
982                 goto fail;
983
984         tdb_unlock(tdb, -1, F_WRLCK);
985         return 0;
986  fail:
987         tdb_unlock(tdb, -1, F_WRLCK);
988         return -1;
989 }
990
991 /* read/write a tdb_off_t */
992 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
993 {
994         return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
995 }
996
997 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
998 {
999         tdb_off_t off = *d;
1000         return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1001 }
1002
1003
1004 /* read a lump of data, allocating the space for it */
1005 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1006 {
1007         unsigned char *buf;
1008
1009         /* some systems don't like zero length malloc */
1010         if (len == 0) {
1011                 len = 1;
1012         }
1013
1014         if (!(buf = (unsigned char *)malloc(len))) {
1015                 /* Ensure ecode is set for log fn. */
1016                 tdb->ecode = TDB_ERR_OOM;
1017                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1018                            len, strerror(errno)));
1019                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1020         }
1021         if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1022                 SAFE_FREE(buf);
1023                 return NULL;
1024         }
1025         return buf;
1026 }
1027
1028 /* Give a piece of tdb data to a parser */
1029
1030 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1031                    tdb_off_t offset, tdb_len_t len,
1032                    int (*parser)(TDB_DATA key, TDB_DATA data,
1033                                  void *private_data),
1034                    void *private_data)
1035 {
1036         TDB_DATA data;
1037         int result;
1038
1039         data.dsize = len;
1040
1041         if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1042                 /*
1043                  * Optimize by avoiding the malloc/memcpy/free, point the
1044                  * parser directly at the mmap area.
1045                  */
1046                 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1047                         return -1;
1048                 }
1049                 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1050                 return parser(key, data, private_data);
1051         }
1052
1053         if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1054                 return -1;
1055         }
1056
1057         result = parser(key, data, private_data);
1058         free(data.dptr);
1059         return result;
1060 }
1061
1062 /* read/write a record */
1063 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1064 {
1065         if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1066                 return -1;
1067         if (TDB_BAD_MAGIC(rec)) {
1068                 /* Ensure ecode is set for log fn. */
1069                 tdb->ecode = TDB_ERR_CORRUPT;
1070                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1071                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1072         }
1073         return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1074 }
1075
1076 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1077 {
1078         struct list_struct r = *rec;
1079         return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1080 }
1081
1082 static const struct tdb_methods io_methods = {
1083         tdb_read,
1084         tdb_write,
1085         tdb_next_hash_chain,
1086         tdb_oob,
1087         tdb_expand_file,
1088         tdb_brlock
1089 };
1090
1091 /*
1092   initialise the default methods table
1093 */
1094 void tdb_io_init(struct tdb_context *tdb)
1095 {
1096         tdb->methods = &io_methods;
1097 }
1098
1099 /* file: transaction.c */
1100
1101 /*
1102   transaction design:
1103
1104   - only allow a single transaction at a time per database. This makes
1105     using the transaction API simpler, as otherwise the caller would
1106     have to cope with temporary failures in transactions that conflict
1107     with other current transactions
1108
1109   - keep the transaction recovery information in the same file as the
1110     database, using a special 'transaction recovery' record pointed at
1111     by the header. This removes the need for extra journal files as
1112     used by some other databases
1113
1114   - dynamically allocated the transaction recover record, re-using it
1115     for subsequent transactions. If a larger record is needed then
1116     tdb_free() the old record to place it on the normal tdb freelist
1117     before allocating the new record
1118
1119   - during transactions, keep a linked list of writes all that have
1120     been performed by intercepting all tdb_write() calls. The hooked
1121     transaction versions of tdb_read() and tdb_write() check this
1122     linked list and try to use the elements of the list in preference
1123     to the real database.
1124
1125   - don't allow any locks to be held when a transaction starts,
1126     otherwise we can end up with deadlock (plus lack of lock nesting
1127     in posix locks would mean the lock is lost)
1128
1129   - if the caller gains a lock during the transaction but doesn't
1130     release it then fail the commit
1131
1132   - allow for nested calls to tdb_transaction_start(), re-using the
1133     existing transaction record. If the inner transaction is cancelled
1134     then a subsequent commit will fail
1135  
1136   - keep a mirrored copy of the tdb hash chain heads to allow for the
1137     fast hash heads scan on traverse, updating the mirrored copy in
1138     the transaction version of tdb_write
1139
1140   - allow callers to mix transaction and non-transaction use of tdb,
1141     although once a transaction is started then an exclusive lock is
1142     gained until the transaction is committed or cancelled
1143
1144   - the commit stategy involves first saving away all modified data
1145     into a linearised buffer in the transaction recovery area, then
1146     marking the transaction recovery area with a magic value to
1147     indicate a valid recovery record. In total 4 fsync/msync calls are
1148     needed per commit to prevent race conditions. It might be possible
1149     to reduce this to 3 or even 2 with some more work.
1150
1151   - check for a valid recovery record on open of the tdb, while the
1152     global lock is held. Automatically recover from the transaction
1153     recovery area if needed, then continue with the open as
1154     usual. This allows for smooth crash recovery with no administrator
1155     intervention.
1156
1157   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1158     still available, but no transaction recovery area is used and no
1159     fsync/msync calls are made.
1160
1161 */
1162
1163 struct tdb_transaction_el {
1164         struct tdb_transaction_el *next, *prev;
1165         tdb_off_t offset;
1166         tdb_len_t length;
1167         unsigned char *data;
1168 };
1169
1170 /*
1171   hold the context of any current transaction
1172 */
1173 struct tdb_transaction {
1174         /* we keep a mirrored copy of the tdb hash heads here so
1175            tdb_next_hash_chain() can operate efficiently */
1176         u32 *hash_heads;
1177
1178         /* the original io methods - used to do IOs to the real db */
1179         const struct tdb_methods *io_methods;
1180
1181         /* the list of transaction elements. We use a doubly linked
1182            list with a last pointer to allow us to keep the list
1183            ordered, with first element at the front of the list. It
1184            needs to be doubly linked as the read/write traversals need
1185            to be backwards, while the commit needs to be forwards */
1186         struct tdb_transaction_el *elements, *elements_last;
1187
1188         /* non-zero when an internal transaction error has
1189            occurred. All write operations will then fail until the
1190            transaction is ended */
1191         int transaction_error;
1192
1193         /* when inside a transaction we need to keep track of any
1194            nested tdb_transaction_start() calls, as these are allowed,
1195            but don't create a new transaction */
1196         int nesting;
1197
1198         /* old file size before transaction */
1199         tdb_len_t old_map_size;
1200 };
1201
1202
1203 /*
1204   read while in a transaction. We need to check first if the data is in our list
1205   of transaction elements, then if not do a real read
1206 */
1207 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
1208                             tdb_len_t len, int cv)
1209 {
1210         struct tdb_transaction_el *el;
1211
1212         /* we need to walk the list backwards to get the most recent data */
1213         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1214                 tdb_len_t partial;
1215
1216                 if (off+len <= el->offset) {
1217                         continue;
1218                 }
1219                 if (off >= el->offset + el->length) {
1220                         continue;
1221                 }
1222
1223                 /* an overlapping read - needs to be split into up to
1224                    2 reads and a memcpy */
1225                 if (off < el->offset) {
1226                         partial = el->offset - off;
1227                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1228                                 goto fail;
1229                         }
1230                         len -= partial;
1231                         off += partial;
1232                         buf = (void *)(partial + (char *)buf);
1233                 }
1234                 if (off + len <= el->offset + el->length) {
1235                         partial = len;
1236                 } else {
1237                         partial = el->offset + el->length - off;
1238                 }
1239                 memcpy(buf, el->data + (off - el->offset), partial);
1240                 if (cv) {
1241                         tdb_convert(buf, len);
1242                 }
1243                 len -= partial;
1244                 off += partial;
1245                 buf = (void *)(partial + (char *)buf);
1246                 
1247                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1248                         goto fail;
1249                 }
1250
1251                 return 0;
1252         }
1253
1254         /* its not in the transaction elements - do a real read */
1255         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1256
1257 fail:
1258         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1259         tdb->ecode = TDB_ERR_IO;
1260         tdb->transaction->transaction_error = 1;
1261         return -1;
1262 }
1263
1264
1265 /*
1266   write while in a transaction
1267 */
1268 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
1269                              const void *buf, tdb_len_t len)
1270 {
1271         struct tdb_transaction_el *el, *best_el=NULL;
1272
1273         if (len == 0) {
1274                 return 0;
1275         }
1276         
1277         /* if the write is to a hash head, then update the transaction
1278            hash heads */
1279         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1280             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1281                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1282                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1283         }
1284
1285         /* first see if we can replace an existing entry */
1286         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1287                 tdb_len_t partial;
1288
1289                 if (best_el == NULL && off == el->offset+el->length) {
1290                         best_el = el;
1291                 }
1292
1293                 if (off+len <= el->offset) {
1294                         continue;
1295                 }
1296                 if (off >= el->offset + el->length) {
1297                         continue;
1298                 }
1299
1300                 /* an overlapping write - needs to be split into up to
1301                    2 writes and a memcpy */
1302                 if (off < el->offset) {
1303                         partial = el->offset - off;
1304                         if (transaction_write(tdb, off, buf, partial) != 0) {
1305                                 goto fail;
1306                         }
1307                         len -= partial;
1308                         off += partial;
1309                         buf = (const void *)(partial + (const char *)buf);
1310                 }
1311                 if (off + len <= el->offset + el->length) {
1312                         partial = len;
1313                 } else {
1314                         partial = el->offset + el->length - off;
1315                 }
1316                 memcpy(el->data + (off - el->offset), buf, partial);
1317                 len -= partial;
1318                 off += partial;
1319                 buf = (const void *)(partial + (const char *)buf);
1320                 
1321                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1322                         goto fail;
1323                 }
1324
1325                 return 0;
1326         }
1327
1328         /* see if we can append the new entry to an existing entry */
1329         if (best_el && best_el->offset + best_el->length == off && 
1330             (off+len < tdb->transaction->old_map_size ||
1331              off > tdb->transaction->old_map_size)) {
1332                 unsigned char *data = best_el->data;
1333                 el = best_el;
1334                 el->data = (unsigned char *)realloc(el->data,
1335                                                     el->length + len);
1336                 if (el->data == NULL) {
1337                         tdb->ecode = TDB_ERR_OOM;
1338                         tdb->transaction->transaction_error = 1;
1339                         el->data = data;
1340                         return -1;
1341                 }
1342                 if (buf) {
1343                         memcpy(el->data + el->length, buf, len);
1344                 } else {
1345                         memset(el->data + el->length, TDB_PAD_BYTE, len);
1346                 }
1347                 el->length += len;
1348                 return 0;
1349         }
1350
1351         /* add a new entry at the end of the list */
1352         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1353         if (el == NULL) {
1354                 tdb->ecode = TDB_ERR_OOM;
1355                 tdb->transaction->transaction_error = 1;                
1356                 return -1;
1357         }
1358         el->next = NULL;
1359         el->prev = tdb->transaction->elements_last;
1360         el->offset = off;
1361         el->length = len;
1362         el->data = (unsigned char *)malloc(len);
1363         if (el->data == NULL) {
1364                 free(el);
1365                 tdb->ecode = TDB_ERR_OOM;
1366                 tdb->transaction->transaction_error = 1;                
1367                 return -1;
1368         }
1369         if (buf) {
1370                 memcpy(el->data, buf, len);
1371         } else {
1372                 memset(el->data, TDB_PAD_BYTE, len);
1373         }
1374         if (el->prev) {
1375                 el->prev->next = el;
1376         } else {
1377                 tdb->transaction->elements = el;
1378         }
1379         tdb->transaction->elements_last = el;
1380         return 0;
1381
1382 fail:
1383         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1384         tdb->ecode = TDB_ERR_IO;
1385         tdb->transaction->transaction_error = 1;
1386         return -1;
1387 }
1388
1389 /*
1390   accelerated hash chain head search, using the cached hash heads
1391 */
1392 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1393 {
1394         u32 h = *chain;
1395         for (;h < tdb->header.hash_size;h++) {
1396                 /* the +1 takes account of the freelist */
1397                 if (0 != tdb->transaction->hash_heads[h+1]) {
1398                         break;
1399                 }
1400         }
1401         (*chain) = h;
1402 }
1403
1404 /*
1405   out of bounds check during a transaction
1406 */
1407 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1408 {
1409         if (len <= tdb->map_size) {
1410                 return 0;
1411         }
1412         return TDB_ERRCODE(TDB_ERR_IO, -1);
1413 }
1414
1415 /*
1416   transaction version of tdb_expand().
1417 */
1418 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
1419                                    tdb_off_t addition)
1420 {
1421         /* add a write to the transaction elements, so subsequent
1422            reads see the zero data */
1423         if (transaction_write(tdb, size, NULL, addition) != 0) {
1424                 return -1;
1425         }
1426
1427         return 0;
1428 }
1429
1430 /*
1431   brlock during a transaction - ignore them
1432 */
1433 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1434                               int rw_type, int lck_type, int probe, size_t len)
1435 {
1436         return 0;
1437 }
1438
1439 static const struct tdb_methods transaction_methods = {
1440         transaction_read,
1441         transaction_write,
1442         transaction_next_hash_chain,
1443         transaction_oob,
1444         transaction_expand_file,
1445         transaction_brlock
1446 };
1447
1448
1449 /*
1450   start a tdb transaction. No token is returned, as only a single
1451   transaction is allowed to be pending per tdb_context
1452 */
1453 int tdb_transaction_start(struct tdb_context *tdb)
1454 {
1455         /* some sanity checks */
1456         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1457                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1458                 tdb->ecode = TDB_ERR_EINVAL;
1459                 return -1;
1460         }
1461
1462         /* cope with nested tdb_transaction_start() calls */
1463         if (tdb->transaction != NULL) {
1464                 tdb->transaction->nesting++;
1465                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
1466                          tdb->transaction->nesting));
1467                 return 0;
1468         }
1469
1470         if (tdb->num_locks != 0 || tdb->global_lock.count) {
1471                 /* the caller must not have any locks when starting a
1472                    transaction as otherwise we'll be screwed by lack
1473                    of nested locks in posix */
1474                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1475                 tdb->ecode = TDB_ERR_LOCK;
1476                 return -1;
1477         }
1478
1479         if (tdb->travlocks.next != NULL) {
1480                 /* you cannot use transactions inside a traverse (although you can use
1481                    traverse inside a transaction) as otherwise you can end up with
1482                    deadlock */
1483                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1484                 tdb->ecode = TDB_ERR_LOCK;
1485                 return -1;
1486         }
1487
1488         tdb->transaction = (struct tdb_transaction *)
1489                 calloc(sizeof(struct tdb_transaction), 1);
1490         if (tdb->transaction == NULL) {
1491                 tdb->ecode = TDB_ERR_OOM;
1492                 return -1;
1493         }
1494
1495         /* get the transaction write lock. This is a blocking lock. As
1496            discussed with Volker, there are a number of ways we could
1497            make this async, which we will probably do in the future */
1498         if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1499                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
1500                 tdb->ecode = TDB_ERR_LOCK;
1501                 SAFE_FREE(tdb->transaction);
1502                 return -1;
1503         }
1504         
1505         /* get a read lock from the freelist to the end of file. This
1506            is upgraded to a write lock during the commit */
1507         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1508                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1509                 tdb->ecode = TDB_ERR_LOCK;
1510                 goto fail;
1511         }
1512
1513         /* setup a copy of the hash table heads so the hash scan in
1514            traverse can be fast */
1515         tdb->transaction->hash_heads = (u32 *)
1516                 calloc(tdb->header.hash_size+1, sizeof(u32));
1517         if (tdb->transaction->hash_heads == NULL) {
1518                 tdb->ecode = TDB_ERR_OOM;
1519                 goto fail;
1520         }
1521         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1522                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1523                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1524                 tdb->ecode = TDB_ERR_IO;
1525                 goto fail;
1526         }
1527
1528         /* make sure we know about any file expansions already done by
1529            anyone else */
1530         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1531         tdb->transaction->old_map_size = tdb->map_size;
1532
1533         /* finally hook the io methods, replacing them with
1534            transaction specific methods */
1535         tdb->transaction->io_methods = tdb->methods;
1536         tdb->methods = &transaction_methods;
1537
1538         /* by calling this transaction write here, we ensure that we don't grow the
1539            transaction linked list due to hash table updates */
1540         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
1541                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
1542                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1543                 tdb->ecode = TDB_ERR_IO;
1544                 goto fail;
1545         }
1546
1547         return 0;
1548         
1549 fail:
1550         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1551         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1552         SAFE_FREE(tdb->transaction->hash_heads);
1553         SAFE_FREE(tdb->transaction);
1554         return -1;
1555 }
1556
1557
1558 /*
1559   cancel the current transaction
1560 */
1561 int tdb_transaction_cancel(struct tdb_context *tdb)
1562 {       
1563         if (tdb->transaction == NULL) {
1564                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1565                 return -1;
1566         }
1567
1568         if (tdb->transaction->nesting != 0) {
1569                 tdb->transaction->transaction_error = 1;
1570                 tdb->transaction->nesting--;
1571                 return 0;
1572         }               
1573
1574         tdb->map_size = tdb->transaction->old_map_size;
1575
1576         /* free all the transaction elements */
1577         while (tdb->transaction->elements) {
1578                 struct tdb_transaction_el *el = tdb->transaction->elements;
1579                 tdb->transaction->elements = el->next;
1580                 free(el->data);
1581                 free(el);
1582         }
1583
1584         /* remove any global lock created during the transaction */
1585         if (tdb->global_lock.count != 0) {
1586                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1587                 tdb->global_lock.count = 0;
1588         }
1589
1590         /* remove any locks created during the transaction */
1591         if (tdb->num_locks != 0) {
1592                 int i;
1593                 for (i=0;i<tdb->num_lockrecs;i++) {
1594                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1595                                    F_UNLCK,F_SETLKW, 0, 1);
1596                 }
1597                 tdb->num_locks = 0;
1598         }
1599
1600         /* restore the normal io methods */
1601         tdb->methods = tdb->transaction->io_methods;
1602
1603         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1604         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1605         SAFE_FREE(tdb->transaction->hash_heads);
1606         SAFE_FREE(tdb->transaction);
1607         
1608         return 0;
1609 }
1610
1611 /*
1612   sync to disk
1613 */
1614 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1615 {       
1616         if (fsync(tdb->fd) != 0) {
1617                 tdb->ecode = TDB_ERR_IO;
1618                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1619                 return -1;
1620         }
1621 #ifdef MS_SYNC
1622         if (tdb->map_ptr) {
1623                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1624                 if (msync(moffset + (char *)tdb->map_ptr, 
1625                           length + (offset - moffset), MS_SYNC) != 0) {
1626                         tdb->ecode = TDB_ERR_IO;
1627                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1628                                  strerror(errno)));
1629                         return -1;
1630                 }
1631         }
1632 #endif
1633         return 0;
1634 }
1635
1636
1637 /*
1638   work out how much space the linearised recovery data will consume
1639 */
1640 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1641 {
1642         struct tdb_transaction_el *el;
1643         tdb_len_t recovery_size = 0;
1644
1645         recovery_size = sizeof(u32);
1646         for (el=tdb->transaction->elements;el;el=el->next) {
1647                 if (el->offset >= tdb->transaction->old_map_size) {
1648                         continue;
1649                 }
1650                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1651         }
1652
1653         return recovery_size;
1654 }
1655
1656 /*
1657   allocate the recovery area, or use an existing recovery area if it is
1658   large enough
1659 */
1660 static int tdb_recovery_allocate(struct tdb_context *tdb, 
1661                                  tdb_len_t *recovery_size,
1662                                  tdb_off_t *recovery_offset,
1663                                  tdb_len_t *recovery_max_size)
1664 {
1665         struct list_struct rec;
1666         const struct tdb_methods *methods = tdb->transaction->io_methods;
1667         tdb_off_t recovery_head;
1668
1669         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1670                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1671                 return -1;
1672         }
1673
1674         rec.rec_len = 0;
1675
1676         if (recovery_head != 0 && 
1677             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1678                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1679                 return -1;
1680         }
1681
1682         *recovery_size = tdb_recovery_size(tdb);
1683
1684         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1685                 /* it fits in the existing area */
1686                 *recovery_max_size = rec.rec_len;
1687                 *recovery_offset = recovery_head;
1688                 return 0;
1689         }
1690
1691         /* we need to free up the old recovery area, then allocate a
1692            new one at the end of the file. Note that we cannot use
1693            tdb_allocate() to allocate the new one as that might return
1694            us an area that is being currently used (as of the start of
1695            the transaction) */
1696         if (recovery_head != 0) {
1697                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1698                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1699                         return -1;
1700                 }
1701         }
1702
1703         /* the tdb_free() call might have increased the recovery size */
1704         *recovery_size = tdb_recovery_size(tdb);
1705
1706         /* round up to a multiple of page size */
1707         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1708         *recovery_offset = tdb->map_size;
1709         recovery_head = *recovery_offset;
1710
1711         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
1712                                      (tdb->map_size - tdb->transaction->old_map_size) +
1713                                      sizeof(rec) + *recovery_max_size) == -1) {
1714                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1715                 return -1;
1716         }
1717
1718         /* remap the file (if using mmap) */
1719         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1720
1721         /* we have to reset the old map size so that we don't try to expand the file
1722            again in the transaction commit, which would destroy the recovery area */
1723         tdb->transaction->old_map_size = tdb->map_size;
1724
1725         /* write the recovery header offset and sync - we can sync without a race here
1726            as the magic ptr in the recovery record has not been set */
1727         CONVERT(recovery_head);
1728         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
1729                                &recovery_head, sizeof(tdb_off_t)) == -1) {
1730                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1731                 return -1;
1732         }
1733
1734         return 0;
1735 }
1736
1737
1738 /*
1739   setup the recovery data that will be used on a crash during commit
1740 */
1741 static int transaction_setup_recovery(struct tdb_context *tdb, 
1742                                       tdb_off_t *magic_offset)
1743 {
1744         struct tdb_transaction_el *el;
1745         tdb_len_t recovery_size;
1746         unsigned char *data, *p;
1747         const struct tdb_methods *methods = tdb->transaction->io_methods;
1748         struct list_struct *rec;
1749         tdb_off_t recovery_offset, recovery_max_size;
1750         tdb_off_t old_map_size = tdb->transaction->old_map_size;
1751         u32 magic, tailer;
1752
1753         /*
1754           check that the recovery area has enough space
1755         */
1756         if (tdb_recovery_allocate(tdb, &recovery_size, 
1757                                   &recovery_offset, &recovery_max_size) == -1) {
1758                 return -1;
1759         }
1760
1761         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1762         if (data == NULL) {
1763                 tdb->ecode = TDB_ERR_OOM;
1764                 return -1;
1765         }
1766
1767         rec = (struct list_struct *)data;
1768         memset(rec, 0, sizeof(*rec));
1769
1770         rec->magic    = 0;
1771         rec->data_len = recovery_size;
1772         rec->rec_len  = recovery_max_size;
1773         rec->key_len  = old_map_size;
1774         CONVERT(rec);
1775
1776         /* build the recovery data into a single blob to allow us to do a single
1777            large write, which should be more efficient */
1778         p = data + sizeof(*rec);
1779         for (el=tdb->transaction->elements;el;el=el->next) {
1780                 if (el->offset >= old_map_size) {
1781                         continue;
1782                 }
1783                 if (el->offset + el->length > tdb->transaction->old_map_size) {
1784                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1785                         free(data);
1786                         tdb->ecode = TDB_ERR_CORRUPT;
1787                         return -1;
1788                 }
1789                 memcpy(p, &el->offset, 4);
1790                 memcpy(p+4, &el->length, 4);
1791                 if (DOCONV()) {
1792                         tdb_convert(p, 8);
1793                 }
1794                 /* the recovery area contains the old data, not the
1795                    new data, so we have to call the original tdb_read
1796                    method to get it */
1797                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1798                         free(data);
1799                         tdb->ecode = TDB_ERR_IO;
1800                         return -1;
1801                 }
1802                 p += 8 + el->length;
1803         }
1804
1805         /* and the tailer */
1806         tailer = sizeof(*rec) + recovery_max_size;
1807         memcpy(p, &tailer, 4);
1808         CONVERT(p);
1809
1810         /* write the recovery data to the recovery area */
1811         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1812                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1813                 free(data);
1814                 tdb->ecode = TDB_ERR_IO;
1815                 return -1;
1816         }
1817
1818         /* as we don't have ordered writes, we have to sync the recovery
1819            data before we update the magic to indicate that the recovery
1820            data is present */
1821         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1822                 free(data);
1823                 return -1;
1824         }
1825
1826         free(data);
1827
1828         magic = TDB_RECOVERY_MAGIC;
1829         CONVERT(magic);
1830
1831         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1832
1833         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1834                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1835                 tdb->ecode = TDB_ERR_IO;
1836                 return -1;
1837         }
1838
1839         /* ensure the recovery magic marker is on disk */
1840         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1841                 return -1;
1842         }
1843
1844         return 0;
1845 }
1846
1847 /*
1848   commit the current transaction
1849 */
1850 int tdb_transaction_commit(struct tdb_context *tdb)
1851 {       
1852         const struct tdb_methods *methods;
1853         tdb_off_t magic_offset = 0;
1854         u32 zero = 0;
1855
1856         if (tdb->transaction == NULL) {
1857                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1858                 return -1;
1859         }
1860
1861         if (tdb->transaction->transaction_error) {
1862                 tdb->ecode = TDB_ERR_IO;
1863                 tdb_transaction_cancel(tdb);
1864                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1865                 return -1;
1866         }
1867
1868         if (tdb->transaction->nesting != 0) {
1869                 tdb->transaction->nesting--;
1870                 return 0;
1871         }               
1872
1873         /* check for a null transaction */
1874         if (tdb->transaction->elements == NULL) {
1875                 tdb_transaction_cancel(tdb);
1876                 return 0;
1877         }
1878
1879         methods = tdb->transaction->io_methods;
1880         
1881         /* if there are any locks pending then the caller has not
1882            nested their locks properly, so fail the transaction */
1883         if (tdb->num_locks || tdb->global_lock.count) {
1884                 tdb->ecode = TDB_ERR_LOCK;
1885                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
1886                 tdb_transaction_cancel(tdb);
1887                 return -1;
1888         }
1889
1890         /* upgrade the main transaction lock region to a write lock */
1891         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
1892                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
1893                 tdb->ecode = TDB_ERR_LOCK;
1894                 tdb_transaction_cancel(tdb);
1895                 return -1;
1896         }
1897
1898         /* get the global lock - this prevents new users attaching to the database
1899            during the commit */
1900         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1901                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
1902                 tdb->ecode = TDB_ERR_LOCK;
1903                 tdb_transaction_cancel(tdb);
1904                 return -1;
1905         }
1906
1907         if (!(tdb->flags & TDB_NOSYNC)) {
1908                 /* write the recovery data to the end of the file */
1909                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
1910                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
1911                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1912                         tdb_transaction_cancel(tdb);
1913                         return -1;
1914                 }
1915         }
1916
1917         /* expand the file to the new size if needed */
1918         if (tdb->map_size != tdb->transaction->old_map_size) {
1919                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
1920                                              tdb->map_size - 
1921                                              tdb->transaction->old_map_size) == -1) {
1922                         tdb->ecode = TDB_ERR_IO;
1923                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
1924                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1925                         tdb_transaction_cancel(tdb);
1926                         return -1;
1927                 }
1928                 tdb->map_size = tdb->transaction->old_map_size;
1929                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1930         }
1931
1932         /* perform all the writes */
1933         while (tdb->transaction->elements) {
1934                 struct tdb_transaction_el *el = tdb->transaction->elements;
1935
1936                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
1937                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1938                         
1939                         /* we've overwritten part of the data and
1940                            possibly expanded the file, so we need to
1941                            run the crash recovery code */
1942                         tdb->methods = methods;
1943                         tdb_transaction_recover(tdb); 
1944
1945                         tdb_transaction_cancel(tdb);
1946                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1947
1948                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1949                         return -1;
1950                 }
1951                 tdb->transaction->elements = el->next;
1952                 free(el->data); 
1953                 free(el);
1954         } 
1955
1956         if (!(tdb->flags & TDB_NOSYNC)) {
1957                 /* ensure the new data is on disk */
1958                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1959                         return -1;
1960                 }
1961
1962                 /* remove the recovery marker */
1963                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
1964                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
1965                         return -1;
1966                 }
1967
1968                 /* ensure the recovery marker has been removed on disk */
1969                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
1970                         return -1;
1971                 }
1972         }
1973
1974         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1975
1976         /*
1977           TODO: maybe write to some dummy hdr field, or write to magic
1978           offset without mmap, before the last sync, instead of the
1979           utime() call
1980         */
1981
1982         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1983            don't change the mtime of the file, this means the file may
1984            not be backed up (as tdb rounding to block sizes means that
1985            file size changes are quite rare too). The following forces
1986            mtime changes when a transaction completes */
1987 #ifdef HAVE_UTIME
1988         utime(tdb->name, NULL);
1989 #endif
1990
1991         /* use a transaction cancel to free memory and remove the
1992            transaction locks */
1993         tdb_transaction_cancel(tdb);
1994         return 0;
1995 }
1996
1997
1998 /*
1999   recover from an aborted transaction. Must be called with exclusive
2000   database write access already established (including the global
2001   lock to prevent new processes attaching)
2002 */
2003 int tdb_transaction_recover(struct tdb_context *tdb)
2004 {
2005         tdb_off_t recovery_head, recovery_eof;
2006         unsigned char *data, *p;
2007         u32 zero = 0;
2008         struct list_struct rec;
2009
2010         /* find the recovery area */
2011         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2012                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2013                 tdb->ecode = TDB_ERR_IO;
2014                 return -1;
2015         }
2016
2017         if (recovery_head == 0) {
2018                 /* we have never allocated a recovery record */
2019                 return 0;
2020         }
2021
2022         /* read the recovery record */
2023         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
2024                                    sizeof(rec), DOCONV()) == -1) {
2025                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
2026                 tdb->ecode = TDB_ERR_IO;
2027                 return -1;
2028         }
2029
2030         if (rec.magic != TDB_RECOVERY_MAGIC) {
2031                 /* there is no valid recovery data */
2032                 return 0;
2033         }
2034
2035         if (tdb->read_only) {
2036                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2037                 tdb->ecode = TDB_ERR_CORRUPT;
2038                 return -1;
2039         }
2040
2041         recovery_eof = rec.key_len;
2042
2043         data = (unsigned char *)malloc(rec.data_len);
2044         if (data == NULL) {
2045                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
2046                 tdb->ecode = TDB_ERR_OOM;
2047                 return -1;
2048         }
2049
2050         /* read the full recovery data */
2051         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2052                                    rec.data_len, 0) == -1) {
2053                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
2054                 tdb->ecode = TDB_ERR_IO;
2055                 return -1;
2056         }
2057
2058         /* recover the file data */
2059         p = data;
2060         while (p+8 < data + rec.data_len) {
2061                 u32 ofs, len;
2062                 if (DOCONV()) {
2063                         tdb_convert(p, 8);
2064                 }
2065                 memcpy(&ofs, p, 4);
2066                 memcpy(&len, p+4, 4);
2067
2068                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2069                         free(data);
2070                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2071                         tdb->ecode = TDB_ERR_IO;
2072                         return -1;
2073                 }
2074                 p += 8 + len;
2075         }
2076
2077         free(data);
2078
2079         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2080                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2081                 tdb->ecode = TDB_ERR_IO;
2082                 return -1;
2083         }
2084
2085         /* if the recovery area is after the recovered eof then remove it */
2086         if (recovery_eof <= recovery_head) {
2087                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2088                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2089                         tdb->ecode = TDB_ERR_IO;
2090                         return -1;                      
2091                 }
2092         }
2093
2094         /* remove the recovery magic */
2095         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
2096                           &zero) == -1) {
2097                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2098                 tdb->ecode = TDB_ERR_IO;
2099                 return -1;                      
2100         }
2101         
2102         /* reduce the file size to the old size */
2103         tdb_munmap(tdb);
2104         if (ftruncate(tdb->fd, recovery_eof) != 0) {
2105                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2106                 tdb->ecode = TDB_ERR_IO;
2107                 return -1;                      
2108         }
2109         tdb->map_size = recovery_eof;
2110         tdb_mmap(tdb);
2111
2112         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2113                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2114                 tdb->ecode = TDB_ERR_IO;
2115                 return -1;
2116         }
2117
2118         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
2119                  recovery_eof));
2120
2121         /* all done */
2122         return 0;
2123 }
2124
2125 /* file: freelist.c */
2126
2127 /* read a freelist record and check for simple errors */
2128 static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2129 {
2130         if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2131                 return -1;
2132
2133         if (rec->magic == TDB_MAGIC) {
2134                 /* this happens when a app is showdown while deleting a record - we should
2135                    not completely fail when this happens */
2136                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
2137                          rec->magic, off));
2138                 rec->magic = TDB_FREE_MAGIC;
2139                 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2140                         return -1;
2141         }
2142
2143         if (rec->magic != TDB_FREE_MAGIC) {
2144                 /* Ensure ecode is set for log fn. */
2145                 tdb->ecode = TDB_ERR_CORRUPT;
2146                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read bad magic 0x%x at offset=%d\n", 
2147                            rec->magic, off));
2148                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2149         }
2150         if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2151                 return -1;
2152         return 0;
2153 }
2154
2155
2156
2157 /* Remove an element from the freelist.  Must have alloc lock. */
2158 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2159 {
2160         tdb_off_t last_ptr, i;
2161
2162         /* read in the freelist top */
2163         last_ptr = FREELIST_TOP;
2164         while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2165                 if (i == off) {
2166                         /* We've found it! */
2167                         return tdb_ofs_write(tdb, last_ptr, &next);
2168                 }
2169                 /* Follow chain (next offset is at start of record) */
2170                 last_ptr = i;
2171         }
2172         TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2173         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2174 }
2175
2176
2177 /* update a record tailer (must hold allocation lock) */
2178 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2179                          const struct list_struct *rec)
2180 {
2181         tdb_off_t totalsize;
2182
2183         /* Offset of tailer from record header */
2184         totalsize = sizeof(*rec) + rec->rec_len;
2185         return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2186                          &totalsize);
2187 }
2188
2189 /* Add an element into the freelist. Merge adjacent records if
2190    neccessary. */
2191 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2192 {
2193         tdb_off_t right, left;
2194
2195         /* Allocation and tailer lock */
2196         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2197                 return -1;
2198
2199         /* set an initial tailer, so if we fail we don't leave a bogus record */
2200         if (update_tailer(tdb, offset, rec) != 0) {
2201                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2202                 goto fail;
2203         }
2204
2205         /* Look right first (I'm an Australian, dammit) */
2206         right = offset + sizeof(*rec) + rec->rec_len;
2207         if (right + sizeof(*rec) <= tdb->map_size) {
2208                 struct list_struct r;
2209
2210                 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2211                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2212                         goto left;
2213                 }
2214
2215                 /* If it's free, expand to include it. */
2216                 if (r.magic == TDB_FREE_MAGIC) {
2217                         if (remove_from_freelist(tdb, right, r.next) == -1) {
2218                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2219                                 goto left;
2220                         }
2221                         rec->rec_len += sizeof(r) + r.rec_len;
2222                 }
2223         }
2224
2225 left:
2226         /* Look left */
2227         left = offset - sizeof(tdb_off_t);
2228         if (left > TDB_DATA_START(tdb->header.hash_size)) {
2229                 struct list_struct l;
2230                 tdb_off_t leftsize;
2231                 
2232                 /* Read in tailer and jump back to header */
2233                 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2234                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2235                         goto update;
2236                 }
2237
2238                 /* it could be uninitialised data */
2239                 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2240                         goto update;
2241                 }
2242
2243                 left = offset - leftsize;
2244
2245                 /* Now read in record */
2246                 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2247                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2248                         goto update;
2249                 }
2250
2251                 /* If it's free, expand to include it. */
2252                 if (l.magic == TDB_FREE_MAGIC) {
2253                         if (remove_from_freelist(tdb, left, l.next) == -1) {
2254                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2255                                 goto update;
2256                         } else {
2257                                 offset = left;
2258                                 rec->rec_len += leftsize;
2259                         }
2260                 }
2261         }
2262
2263 update:
2264         if (update_tailer(tdb, offset, rec) == -1) {
2265                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2266                 goto fail;
2267         }
2268
2269         /* Now, prepend to free list */
2270         rec->magic = TDB_FREE_MAGIC;
2271
2272         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2273             tdb_rec_write(tdb, offset, rec) == -1 ||
2274             tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2275                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2276                 goto fail;
2277         }
2278
2279         /* And we're done. */
2280         tdb_unlock(tdb, -1, F_WRLCK);
2281         return 0;
2282
2283  fail:
2284         tdb_unlock(tdb, -1, F_WRLCK);
2285         return -1;
2286 }
2287
2288
2289 /* 
2290    the core of tdb_allocate - called when we have decided which
2291    free list entry to use
2292  */
2293 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2294                                 struct list_struct *rec, tdb_off_t last_ptr)
2295 {
2296         struct list_struct newrec;
2297         tdb_off_t newrec_ptr;
2298
2299         memset(&newrec, '\0', sizeof(newrec));
2300
2301         /* found it - now possibly split it up  */
2302         if (rec->rec_len > length + MIN_REC_SIZE) {
2303                 /* Length of left piece */
2304                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2305                 
2306                 /* Right piece to go on free list */
2307                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2308                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2309                 
2310                 /* And left record is shortened */
2311                 rec->rec_len = length;
2312         } else {
2313                 newrec_ptr = 0;
2314         }
2315         
2316         /* Remove allocated record from the free list */
2317         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2318                 return 0;
2319         }
2320         
2321         /* Update header: do this before we drop alloc
2322            lock, otherwise tdb_free() might try to
2323            merge with us, thinking we're free.
2324            (Thanks Jeremy Allison). */
2325         rec->magic = TDB_MAGIC;
2326         if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2327                 return 0;
2328         }
2329         
2330         /* Did we create new block? */
2331         if (newrec_ptr) {
2332                 /* Update allocated record tailer (we
2333                    shortened it). */
2334                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2335                         return 0;
2336                 }
2337                 
2338                 /* Free new record */
2339                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2340                         return 0;
2341                 }
2342         }
2343         
2344         /* all done - return the new record offset */
2345         return rec_ptr;
2346 }
2347
2348 /* allocate some space from the free list. The offset returned points
2349    to a unconnected list_struct within the database with room for at
2350    least length bytes of total data
2351
2352    0 is returned if the space could not be allocated
2353  */
2354 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2355 {
2356         tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2357         struct {
2358                 tdb_off_t rec_ptr, last_ptr;
2359                 tdb_len_t rec_len;
2360         } bestfit;
2361
2362         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2363                 return 0;
2364
2365         /* Extra bytes required for tailer */
2366         length += sizeof(tdb_off_t);
2367
2368  again:
2369         last_ptr = FREELIST_TOP;
2370
2371         /* read in the freelist top */
2372         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2373                 goto fail;
2374
2375         bestfit.rec_ptr = 0;
2376         bestfit.last_ptr = 0;
2377         bestfit.rec_len = 0;
2378
2379         /* 
2380            this is a best fit allocation strategy. Originally we used
2381            a first fit strategy, but it suffered from massive fragmentation
2382            issues when faced with a slowly increasing record size.
2383          */
2384         while (rec_ptr) {
2385                 if (rec_free_read(tdb, rec_ptr, rec) == -1) {
2386                         goto fail;
2387                 }
2388
2389                 if (rec->rec_len >= length) {
2390                         if (bestfit.rec_ptr == 0 ||
2391                             rec->rec_len < bestfit.rec_len) {
2392                                 bestfit.rec_len = rec->rec_len;
2393                                 bestfit.rec_ptr = rec_ptr;
2394                                 bestfit.last_ptr = last_ptr;
2395                                 /* consider a fit to be good enough if
2396                                    we aren't wasting more than half
2397                                    the space */
2398                                 if (bestfit.rec_len < 2*length) {
2399                                         break;
2400                                 }
2401                         }
2402                 }
2403
2404                 /* move to the next record */
2405                 last_ptr = rec_ptr;
2406                 rec_ptr = rec->next;
2407         }
2408
2409         if (bestfit.rec_ptr != 0) {
2410                 if (rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2411                         goto fail;
2412                 }
2413
2414                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2415                 tdb_unlock(tdb, -1, F_WRLCK);
2416                 return newrec_ptr;
2417         }
2418
2419         /* we didn't find enough space. See if we can expand the
2420            database and if we can then try again */
2421         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2422                 goto again;
2423  fail:
2424         tdb_unlock(tdb, -1, F_WRLCK);
2425         return 0;
2426 }
2427
2428 /* file: freelistcheck.c */
2429
2430 /* Check the freelist is good and contains no loops.
2431    Very memory intensive - only do this as a consistency
2432    checker. Heh heh - uses an in memory tdb as the storage
2433    for the "seen" record list. For some reason this strikes
2434    me as extremely clever as I don't have to write another tree
2435    data structure implementation :-).
2436  */
2437
2438 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2439 {
2440         TDB_DATA key, data;
2441
2442         memset(&data, '\0', sizeof(data));
2443         key.dptr = (unsigned char *)&rec_ptr;
2444         key.dsize = sizeof(rec_ptr);
2445         return tdb_store(mem_tdb, key, data, TDB_INSERT);
2446 }
2447
2448 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2449 {
2450         struct tdb_context *mem_tdb = NULL;
2451         struct list_struct rec;
2452         tdb_off_t rec_ptr, last_ptr;
2453         int ret = -1;
2454
2455         *pnum_entries = 0;
2456
2457         mem_tdb = tdb_open("flval", tdb->header.hash_size,
2458                                 TDB_INTERNAL, O_RDWR, 0600);
2459         if (!mem_tdb) {
2460                 return -1;
2461         }
2462
2463         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2464                 tdb_close(mem_tdb);
2465                 return 0;
2466         }
2467
2468         last_ptr = FREELIST_TOP;
2469
2470         /* Store the FREELIST_TOP record. */
2471         if (seen_insert(mem_tdb, last_ptr) == -1) {
2472                 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2473                 goto fail;
2474         }
2475
2476         /* read in the freelist top */
2477         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2478                 goto fail;
2479         }
2480
2481         while (rec_ptr) {
2482
2483                 /* If we can't store this record (we've seen it
2484                    before) then the free list has a loop and must
2485                    be corrupt. */
2486
2487                 if (seen_insert(mem_tdb, rec_ptr)) {
2488                         ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2489                         goto fail;
2490                 }
2491
2492                 if (rec_free_read(tdb, rec_ptr, &rec) == -1) {
2493                         goto fail;
2494                 }
2495
2496                 /* move to the next record */
2497                 last_ptr = rec_ptr;
2498                 rec_ptr = rec.next;
2499                 *pnum_entries += 1;
2500         }
2501
2502         ret = 0;
2503
2504   fail:
2505
2506         tdb_close(mem_tdb);
2507         tdb_unlock(tdb, -1, F_WRLCK);
2508         return ret;
2509 }
2510
2511 /* file: traverse.c */
2512
2513 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2514 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2515                          struct list_struct *rec)
2516 {
2517         int want_next = (tlock->off != 0);
2518
2519         /* Lock each chain from the start one. */
2520         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2521                 if (!tlock->off && tlock->hash != 0) {
2522                         /* this is an optimisation for the common case where
2523                            the hash chain is empty, which is particularly
2524                            common for the use of tdb with ldb, where large
2525                            hashes are used. In that case we spend most of our
2526                            time in tdb_brlock(), locking empty hash chains.
2527                            
2528                            To avoid this, we do an unlocked pre-check to see
2529                            if the hash chain is empty before starting to look
2530                            inside it. If it is empty then we can avoid that
2531                            hash chain. If it isn't empty then we can't believe
2532                            the value we get back, as we read it without a
2533                            lock, so instead we get the lock and re-fetch the
2534                            value below.
2535                            
2536                            Notice that not doing this optimisation on the
2537                            first hash chain is critical. We must guarantee
2538                            that we have done at least one fcntl lock at the
2539                            start of a search to guarantee that memory is
2540                            coherent on SMP systems. If records are added by
2541                            others during the search then thats OK, and we
2542                            could possibly miss those with this trick, but we
2543                            could miss them anyway without this trick, so the
2544                            semantics don't change.
2545                            
2546                            With a non-indexed ldb search this trick gains us a
2547                            factor of around 80 in speed on a linux 2.6.x
2548                            system (testing using ldbtest).
2549                         */
2550                         tdb->methods->next_hash_chain(tdb, &tlock->hash);
2551                         if (tlock->hash == tdb->header.hash_size) {
2552                                 continue;
2553                         }
2554                 }
2555
2556                 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2557                         return -1;
2558
2559                 /* No previous record?  Start at top of chain. */
2560                 if (!tlock->off) {
2561                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2562                                      &tlock->off) == -1)
2563                                 goto fail;
2564                 } else {
2565                         /* Otherwise unlock the previous record. */
2566                         if (tdb_unlock_record(tdb, tlock->off) != 0)
2567                                 goto fail;
2568                 }
2569
2570                 if (want_next) {
2571                         /* We have offset of old record: grab next */
2572                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2573                                 goto fail;
2574                         tlock->off = rec->next;
2575                 }
2576
2577                 /* Iterate through chain */
2578                 while( tlock->off) {
2579                         tdb_off_t current;
2580                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2581                                 goto fail;
2582
2583                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2584                         if (tlock->off == rec->next) {
2585                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2586                                 goto fail;
2587                         }
2588
2589                         if (!TDB_DEAD(rec)) {
2590                                 /* Woohoo: we found one! */
2591                                 if (tdb_lock_record(tdb, tlock->off) != 0)
2592                                         goto fail;
2593                                 return tlock->off;
2594                         }
2595
2596                         /* Try to clean dead ones from old traverses */
2597                         current = tlock->off;
2598                         tlock->off = rec->next;
2599                         if (!(tdb->read_only || tdb->traverse_read) && 
2600                             tdb_do_delete(tdb, current, rec) != 0)
2601                                 goto fail;
2602                 }
2603                 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2604                 want_next = 0;
2605         }
2606         /* We finished iteration without finding anything */
2607         return TDB_ERRCODE(TDB_SUCCESS, 0);
2608
2609  fail:
2610         tlock->off = 0;
2611         if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2612                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2613         return -1;
2614 }
2615
2616 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2617    return -1 on error or the record count traversed
2618    if fn is NULL then it is not called
2619    a non-zero return value from fn() indicates that the traversal should stop
2620   */
2621 static int tdb_traverse_internal(struct tdb_context *tdb, 
2622                                  tdb_traverse_func fn, void *private_data,
2623                                  struct tdb_traverse_lock *tl)
2624 {
2625         TDB_DATA key, dbuf;
2626         struct list_struct rec;
2627         int ret, count = 0;
2628
2629         /* This was in the initializaton, above, but the IRIX compiler
2630          * did not like it.  crh
2631          */
2632         tl->next = tdb->travlocks.next;
2633
2634         /* fcntl locks don't stack: beware traverse inside traverse */
2635         tdb->travlocks.next = tl;
2636
2637         /* tdb_next_lock places locks on the record returned, and its chain */
2638         while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2639                 count++;
2640                 /* now read the full record */
2641                 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec), 
2642                                           rec.key_len + rec.data_len);
2643                 if (!key.dptr) {
2644                         ret = -1;
2645                         if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2646                                 goto out;
2647                         if (tdb_unlock_record(tdb, tl->off) != 0)
2648                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2649                         goto out;
2650                 }
2651                 key.dsize = rec.key_len;
2652                 dbuf.dptr = key.dptr + rec.key_len;
2653                 dbuf.dsize = rec.data_len;
2654
2655                 /* Drop chain lock, call out */
2656                 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2657                         ret = -1;
2658                         SAFE_FREE(key.dptr);
2659                         goto out;
2660                 }
2661                 if (fn && fn(tdb, key, dbuf, private_data)) {
2662                         /* They want us to terminate traversal */
2663                         ret = count;
2664                         if (tdb_unlock_record(tdb, tl->off) != 0) {
2665                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2666                                 ret = -1;
2667                         }
2668                         SAFE_FREE(key.dptr);
2669                         goto out;
2670                 }
2671                 SAFE_FREE(key.dptr);
2672         }
2673 out:
2674         tdb->travlocks.next = tl->next;
2675         if (ret < 0)
2676                 return -1;
2677         else
2678                 return count;
2679 }
2680
2681
2682 /*
2683   a write style traverse - temporarily marks the db read only
2684 */
2685 int tdb_traverse_read(struct tdb_context *tdb, 
2686                       tdb_traverse_func fn, void *private_data)
2687 {
2688         struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2689         int ret;
2690         
2691         /* we need to get a read lock on the transaction lock here to
2692            cope with the lock ordering semantics of solaris10 */
2693         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1) {
2694                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse_read: failed to get transaction lock\n"));
2695                 tdb->ecode = TDB_ERR_LOCK;
2696                 return -1;
2697         }
2698
2699         tdb->traverse_read++;
2700         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2701         tdb->traverse_read--;
2702
2703         tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2704
2705         return ret;
2706 }
2707
2708 /*
2709   a write style traverse - needs to get the transaction lock to
2710   prevent deadlocks
2711 */
2712 int tdb_traverse(struct tdb_context *tdb, 
2713                  tdb_traverse_func fn, void *private_data)
2714 {
2715         struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2716         int ret;
2717
2718         if (tdb->read_only || tdb->traverse_read) {
2719                 return tdb_traverse_read(tdb, fn, private_data);
2720         }
2721         
2722         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2723                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse: failed to get transaction lock\n"));
2724                 tdb->ecode = TDB_ERR_LOCK;
2725                 return -1;
2726         }
2727
2728         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2729
2730         tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2731
2732         return ret;
2733 }
2734
2735
2736 /* find the first entry in the database and return its key */
2737 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2738 {
2739         TDB_DATA key;
2740         struct list_struct rec;
2741
2742         /* release any old lock */
2743         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2744                 return tdb_null;
2745         tdb->travlocks.off = tdb->travlocks.hash = 0;
2746         tdb->travlocks.lock_rw = F_RDLCK;
2747
2748         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2749                 return tdb_null;
2750         /* now read the key */
2751         key.dsize = rec.key_len;
2752         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2753         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
2754                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2755         return key;
2756 }
2757
2758 /* find the next entry in the database, returning its key */
2759 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2760 {
2761         u32 oldhash;
2762         TDB_DATA key = tdb_null;
2763         struct list_struct rec;
2764         unsigned char *k = NULL;
2765
2766         /* Is locked key the old key?  If so, traverse will be reliable. */
2767         if (tdb->travlocks.off) {
2768                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
2769                         return tdb_null;
2770                 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2771                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2772                                             rec.key_len))
2773                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2774                         /* No, it wasn't: unlock it and start from scratch */
2775                         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2776                                 SAFE_FREE(k);
2777                                 return tdb_null;
2778                         }
2779                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0) {
2780                                 SAFE_FREE(k);
2781                                 return tdb_null;
2782                         }
2783                         tdb->travlocks.off = 0;
2784                 }
2785
2786                 SAFE_FREE(k);
2787         }
2788
2789         if (!tdb->travlocks.off) {
2790                 /* No previous element: do normal find, and lock record */
2791                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
2792                 if (!tdb->travlocks.off)
2793                         return tdb_null;
2794                 tdb->travlocks.hash = BUCKET(rec.full_hash);
2795                 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2796                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2797                         return tdb_null;
2798                 }
2799         }
2800         oldhash = tdb->travlocks.hash;
2801
2802         /* Grab next record: locks chain and returned record,
2803            unlocks old record */
2804         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2805                 key.dsize = rec.key_len;
2806                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2807                                           key.dsize);
2808                 /* Unlock the chain of this new record */
2809                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
2810                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2811         }
2812         /* Unlock the chain of old record */
2813         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
2814                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2815         return key;
2816 }
2817
2818 /* file: dump.c */
2819
2820 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, tdb_off_t offset)
2821 {
2822         struct list_struct rec;
2823         tdb_off_t tailer_ofs, tailer;
2824
2825         if (tdb->methods->tdb_read(tdb, offset, (char *)&rec, 
2826                                    sizeof(rec), DOCONV()) == -1) {
2827                 printf("ERROR: failed to read record at %u\n", offset);
2828                 return 0;
2829         }
2830
2831         printf(" rec: offset=0x%08x next=0x%08x rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2832                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
2833
2834         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2835
2836         if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2837                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2838                 return rec.next;
2839         }
2840
2841         if (tailer != rec.rec_len + sizeof(rec)) {
2842                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2843                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2844         }
2845         return rec.next;
2846 }
2847
2848 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2849 {
2850         tdb_off_t rec_ptr, top;
2851
2852         top = TDB_HASH_TOP(i);
2853
2854         if (tdb_lock(tdb, i, F_WRLCK) != 0)
2855                 return -1;
2856
2857         if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2858                 return tdb_unlock(tdb, i, F_WRLCK);
2859
2860         if (rec_ptr)
2861                 printf("hash=%d\n", i);
2862
2863         while (rec_ptr) {
2864                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
2865         }
2866
2867         return tdb_unlock(tdb, i, F_WRLCK);
2868 }
2869
2870 void tdb_dump_all(struct tdb_context *tdb)
2871 {
2872         int i;
2873         for (i=0;i<tdb->header.hash_size;i++) {
2874                 tdb_dump_chain(tdb, i);
2875         }
2876         printf("freelist:\n");
2877         tdb_dump_chain(tdb, -1);
2878 }
2879
2880 int tdb_printfreelist(struct tdb_context *tdb)
2881 {
2882         int ret;
2883         long total_free = 0;
2884         tdb_off_t offset, rec_ptr;
2885         struct list_struct rec;
2886
2887         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
2888                 return ret;
2889
2890         offset = FREELIST_TOP;
2891
2892         /* read in the freelist top */
2893         if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
2894                 tdb_unlock(tdb, -1, F_WRLCK);
2895                 return 0;
2896         }
2897
2898         printf("freelist top=[0x%08x]\n", rec_ptr );
2899         while (rec_ptr) {
2900                 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec, 
2901                                            sizeof(rec), DOCONV()) == -1) {
2902                         tdb_unlock(tdb, -1, F_WRLCK);
2903                         return -1;
2904                 }
2905
2906                 if (rec.magic != TDB_FREE_MAGIC) {
2907                         printf("bad magic 0x%08x in free list\n", rec.magic);
2908                         tdb_unlock(tdb, -1, F_WRLCK);
2909                         return -1;
2910                 }
2911
2912                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n", 
2913                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
2914                 total_free += rec.rec_len;
2915
2916                 /* move to the next record */
2917                 rec_ptr = rec.next;
2918         }
2919         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
2920                (int)total_free);
2921
2922         return tdb_unlock(tdb, -1, F_WRLCK);
2923 }
2924
2925 /* file: tdb.c */
2926
2927 TDB_DATA tdb_null;
2928
2929 /*
2930   increment the tdb sequence number if the tdb has been opened using
2931   the TDB_SEQNUM flag
2932 */
2933 static void tdb_increment_seqnum(struct tdb_context *tdb)
2934 {
2935         tdb_off_t seqnum=0;
2936         
2937         if (!(tdb->flags & TDB_SEQNUM)) {
2938                 return;
2939         }
2940
2941         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
2942                 return;
2943         }
2944
2945         /* we ignore errors from this, as we have no sane way of
2946            dealing with them.
2947         */
2948         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
2949         seqnum++;
2950         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
2951
2952         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
2953 }
2954
2955 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
2956 {
2957         return memcmp(data.dptr, key.dptr, data.dsize);
2958 }
2959
2960 /* Returns 0 on fail.  On success, return offset of record, and fills
2961    in rec */
2962 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
2963                         struct list_struct *r)
2964 {
2965         tdb_off_t rec_ptr;
2966         
2967         /* read in the hash top */
2968         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
2969                 return 0;
2970
2971         /* keep looking until we find the right record */
2972         while (rec_ptr) {
2973                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
2974                         return 0;
2975
2976                 if (!TDB_DEAD(r) && hash==r->full_hash
2977                     && key.dsize==r->key_len
2978                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
2979                                       r->key_len, tdb_key_compare,
2980                                       NULL) == 0) {
2981                         return rec_ptr;
2982                 }
2983                 rec_ptr = r->next;
2984         }
2985         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
2986 }
2987
2988 /* As tdb_find, but if you succeed, keep the lock */
2989 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
2990                            struct list_struct *rec)
2991 {
2992         u32 rec_ptr;
2993
2994         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
2995                 return 0;
2996         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
2997                 tdb_unlock(tdb, BUCKET(hash), locktype);
2998         return rec_ptr;
2999 }
3000
3001
3002 /* update an entry in place - this only works if the new data size
3003    is <= the old data size and the key exists.
3004    on failure return -1.
3005 */
3006 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3007 {
3008         struct list_struct rec;
3009         tdb_off_t rec_ptr;
3010
3011         /* find entry */
3012         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3013                 return -1;
3014
3015         /* must be long enough key, data and tailer */
3016         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3017                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3018                 return -1;
3019         }
3020
3021         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3022                       dbuf.dptr, dbuf.dsize) == -1)
3023                 return -1;
3024
3025         if (dbuf.dsize != rec.data_len) {
3026                 /* update size */
3027                 rec.data_len = dbuf.dsize;
3028                 return tdb_rec_write(tdb, rec_ptr, &rec);
3029         }
3030  
3031         return 0;
3032 }
3033
3034 /* find an entry in the database given a key */
3035 /* If an entry doesn't exist tdb_err will be set to
3036  * TDB_ERR_NOEXIST. If a key has no data attached
3037  * then the TDB_DATA will have zero length but
3038  * a non-zero pointer
3039  */
3040 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3041 {
3042         tdb_off_t rec_ptr;
3043         struct list_struct rec;
3044         TDB_DATA ret;
3045         u32 hash;
3046
3047         /* find which hash bucket it is in */
3048         hash = tdb->hash_fn(&key);
3049         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3050                 return tdb_null;
3051
3052         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3053                                   rec.data_len);
3054         ret.dsize = rec.data_len;
3055         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3056         return ret;
3057 }
3058
3059 /*
3060  * Find an entry in the database and hand the record's data to a parsing
3061  * function. The parsing function is executed under the chain read lock, so it
3062  * should be fast and should not block on other syscalls.
3063  *
3064  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3065  *
3066  * For mmapped tdb's that do not have a transaction open it points the parsing
3067  * function directly at the mmap area, it avoids the malloc/memcpy in this
3068  * case. If a transaction is open or no mmap is available, it has to do
3069  * malloc/read/parse/free.
3070  *
3071  * This is interesting for all readers of potentially large data structures in
3072  * the tdb records, ldb indexes being one example.
3073  */
3074
3075 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3076                      int (*parser)(TDB_DATA key, TDB_DATA data,
3077                                    void *private_data),
3078                      void *private_data)
3079 {
3080         tdb_off_t rec_ptr;
3081         struct list_struct rec;
3082         int ret;
3083         u32 hash;
3084
3085         /* find which hash bucket it is in */
3086         hash = tdb->hash_fn(&key);
3087
3088         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3089                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3090         }
3091
3092         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3093                              rec.data_len, parser, private_data);
3094
3095         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3096
3097         return ret;
3098 }
3099
3100 /* check if an entry in the database exists 
3101
3102    note that 1 is returned if the key is found and 0 is returned if not found
3103    this doesn't match the conventions in the rest of this module, but is
3104    compatible with gdbm
3105 */
3106 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3107 {
3108         struct list_struct rec;
3109         
3110         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3111                 return 0;
3112         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3113         return 1;
3114 }
3115
3116 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3117 {
3118         u32 hash = tdb->hash_fn(&key);
3119         return tdb_exists_hash(tdb, key, hash);
3120 }
3121
3122 /* actually delete an entry in the database given the offset */
3123 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3124 {
3125         tdb_off_t last_ptr, i;
3126         struct list_struct lastrec;
3127
3128         if (tdb->read_only || tdb->traverse_read) return -1;
3129
3130         if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3131                 /* Someone traversing here: mark it as dead */
3132                 rec->magic = TDB_DEAD_MAGIC;
3133                 return tdb_rec_write(tdb, rec_ptr, rec);
3134         }
3135         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3136                 return -1;
3137
3138         /* find previous record in hash chain */
3139         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3140                 return -1;
3141         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3142                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3143                         return -1;
3144
3145         /* unlink it: next ptr is at start of record. */
3146         if (last_ptr == 0)
3147                 last_ptr = TDB_HASH_TOP(rec->full_hash);
3148         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3149                 return -1;
3150
3151         /* recover the space */
3152         if (tdb_free(tdb, rec_ptr, rec) == -1)
3153                 return -1;
3154         return 0;
3155 }
3156
3157 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3158 {
3159         int res = 0;
3160         tdb_off_t rec_ptr;
3161         struct list_struct rec;
3162         
3163         /* read in the hash top */
3164         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3165                 return 0;
3166
3167         while (rec_ptr) {
3168                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3169                         return 0;
3170
3171                 if (rec.magic == TDB_DEAD_MAGIC) {
3172                         res += 1;
3173                 }
3174                 rec_ptr = rec.next;
3175         }
3176         return res;
3177 }
3178
3179 /*
3180  * Purge all DEAD records from a hash chain
3181  */
3182 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3183 {
3184         int res = -1;
3185         struct list_struct rec;
3186         tdb_off_t rec_ptr;
3187
3188         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3189                 return -1;
3190         }
3191         
3192         /* read in the hash top */
3193         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3194                 goto fail;
3195
3196         while (rec_ptr) {
3197                 tdb_off_t next;
3198
3199                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3200                         goto fail;
3201                 }
3202
3203                 next = rec.next;
3204
3205                 if (rec.magic == TDB_DEAD_MAGIC
3206                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3207                         goto fail;
3208                 }
3209                 rec_ptr = next;
3210         }
3211         res = 0;
3212  fail:
3213         tdb_unlock(tdb, -1, F_WRLCK);
3214         return res;
3215 }
3216
3217 /* delete an entry in the database given a key */
3218 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3219 {
3220         tdb_off_t rec_ptr;
3221         struct list_struct rec;
3222         int ret;
3223
3224         if (tdb->max_dead_records != 0) {
3225
3226                 /*
3227                  * Allow for some dead records per hash chain, mainly for
3228                  * tdb's with a very high create/delete rate like locking.tdb.
3229                  */
3230
3231                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3232                         return -1;
3233
3234                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3235                         /*
3236                          * Don't let the per-chain freelist grow too large,
3237                          * delete all existing dead records
3238                          */
3239                         tdb_purge_dead(tdb, hash);
3240                 }
3241
3242                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3243                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3244                         return -1;
3245                 }
3246
3247                 /*
3248                  * Just mark the record as dead.
3249                  */
3250                 rec.magic = TDB_DEAD_MAGIC;
3251                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3252         }
3253         else {
3254                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3255                                                    &rec)))
3256                         return -1;
3257
3258                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3259         }
3260
3261         if (ret == 0) {
3262                 tdb_increment_seqnum(tdb);
3263         }
3264
3265         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3266                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3267         return ret;
3268 }
3269
3270 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3271 {
3272         u32 hash = tdb->hash_fn(&key);
3273         return tdb_delete_hash(tdb, key, hash);
3274 }
3275
3276 /*
3277  * See if we have a dead record around with enough space
3278  */
3279 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3280                                struct list_struct *r, tdb_len_t length)
3281 {
3282         tdb_off_t rec_ptr;
3283         
3284         /* read in the hash top */
3285         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3286                 return 0;
3287
3288         /* keep looking until we find the right record */
3289         while (rec_ptr) {
3290                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3291                         return 0;
3292
3293                 if (TDB_DEAD(r) && r->rec_len >= length) {
3294                         /*
3295                          * First fit for simple coding, TODO: change to best
3296                          * fit
3297                          */
3298                         return rec_ptr;
3299                 }
3300                 rec_ptr = r->next;
3301         }
3302         return 0;
3303 }
3304
3305 /* store an element in the database, replacing any existing element
3306    with the same key 
3307
3308    return 0 on success, -1 on failure
3309 */
3310 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3311 {
3312         struct list_struct rec;
3313         u32 hash;
3314         tdb_off_t rec_ptr;
3315         char *p = NULL;
3316         int ret = -1;
3317
3318         if (tdb->read_only || tdb->traverse_read) {
3319                 tdb->ecode = TDB_ERR_RDONLY;
3320                 return -1;
3321         }
3322
3323         /* find which hash bucket it is in */
3324         hash = tdb->hash_fn(&key);
3325         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3326                 return -1;
3327
3328         /* check for it existing, on insert. */
3329         if (flag == TDB_INSERT) {
3330                 if (tdb_exists_hash(tdb, key, hash)) {
3331                         tdb->ecode = TDB_ERR_EXISTS;
3332                         goto fail;
3333                 }
3334         } else {
3335                 /* first try in-place update, on modify or replace. */
3336                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3337                         goto done;
3338                 }
3339                 if (tdb->ecode == TDB_ERR_NOEXIST &&
3340                     flag == TDB_MODIFY) {
3341                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
3342                          we should fail the store */
3343                         goto fail;
3344                 }
3345         }
3346         /* reset the error code potentially set by the tdb_update() */
3347         tdb->ecode = TDB_SUCCESS;
3348
3349         /* delete any existing record - if it doesn't exist we don't
3350            care.  Doing this first reduces fragmentation, and avoids
3351            coalescing with `allocated' block before it's updated. */
3352         if (flag != TDB_INSERT)
3353                 tdb_delete_hash(tdb, key, hash);
3354
3355         /* Copy key+value *before* allocating free space in case malloc
3356            fails and we are left with a dead spot in the tdb. */
3357
3358         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3359                 tdb->ecode = TDB_ERR_OOM;
3360                 goto fail;
3361         }
3362
3363         memcpy(p, key.dptr, key.dsize);
3364         if (dbuf.dsize)
3365                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3366
3367         if (tdb->max_dead_records != 0) {
3368                 /*
3369                  * Allow for some dead records per hash chain, look if we can
3370                  * find one that can hold the new record. We need enough space
3371                  * for key, data and tailer. If we find one, we don't have to
3372                  * consult the central freelist.
3373                  */
3374                 rec_ptr = tdb_find_dead(
3375                         tdb, hash, &rec,
3376                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3377
3378                 if (rec_ptr != 0) {
3379                         rec.key_len = key.dsize;
3380                         rec.data_len = dbuf.dsize;
3381                         rec.full_hash = hash;
3382                         rec.magic = TDB_MAGIC;
3383                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3384                             || tdb->methods->tdb_write(
3385                                     tdb, rec_ptr + sizeof(rec),
3386                                     p, key.dsize + dbuf.dsize) == -1) {
3387                                 goto fail;
3388                         }
3389                         goto done;
3390                 }
3391         }
3392
3393         /*
3394          * We have to allocate some space from the freelist, so this means we
3395          * have to lock it. Use the chance to purge all the DEAD records from
3396          * the hash chain under the freelist lock.
3397          */
3398
3399         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3400                 goto fail;
3401         }
3402
3403         if ((tdb->max_dead_records != 0)
3404             && (tdb_purge_dead(tdb, hash) == -1)) {
3405                 tdb_unlock(tdb, -1, F_WRLCK);
3406                 goto fail;
3407         }
3408
3409         /* we have to allocate some space */
3410         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3411
3412         tdb_unlock(tdb, -1, F_WRLCK);
3413
3414         if (rec_ptr == 0) {
3415                 goto fail;
3416         }
3417
3418         /* Read hash top into next ptr */
3419         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3420                 goto fail;
3421
3422         rec.key_len = key.dsize;
3423         rec.data_len = dbuf.dsize;
3424         rec.full_hash = hash;
3425         rec.magic = TDB_MAGIC;
3426
3427         /* write out and point the top of the hash chain at it */
3428         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3429             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3430             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3431                 /* Need to tdb_unallocate() here */
3432                 goto fail;
3433         }
3434
3435  done:
3436         ret = 0;
3437  fail:
3438         if (ret == 0) {
3439                 tdb_increment_seqnum(tdb);
3440         }
3441
3442         SAFE_FREE(p); 
3443         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3444         return ret;
3445 }
3446
3447
3448 /* Append to an entry. Create if not exist. */
3449 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3450 {
3451         u32 hash;
3452         TDB_DATA dbuf;
3453         int ret = -1;
3454
3455         /* find which hash bucket it is in */
3456         hash = tdb->hash_fn(&key);
3457         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3458                 return -1;
3459
3460         dbuf = tdb_fetch(tdb, key);
3461
3462         if (dbuf.dptr == NULL) {
3463                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3464         } else {
3465                 dbuf.dptr = (unsigned char *)realloc(dbuf.dptr,
3466                                                      dbuf.dsize + new_dbuf.dsize);
3467         }
3468
3469         if (dbuf.dptr == NULL) {
3470                 tdb->ecode = TDB_ERR_OOM;
3471                 goto failed;
3472         }
3473
3474         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3475         dbuf.dsize += new_dbuf.dsize;
3476
3477         ret = tdb_store(tdb, key, dbuf, 0);
3478         
3479 failed:
3480         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3481         SAFE_FREE(dbuf.dptr);
3482         return ret;
3483 }
3484
3485
3486 /*
3487   return the name of the current tdb file
3488   useful for external logging functions
3489 */
3490 const char *tdb_name(struct tdb_context *tdb)
3491 {
3492         return tdb->name;
3493 }
3494
3495 /*
3496   return the underlying file descriptor being used by tdb, or -1
3497   useful for external routines that want to check the device/inode
3498   of the fd
3499 */
3500 int tdb_fd(struct tdb_context *tdb)
3501 {
3502         return tdb->fd;
3503 }
3504
3505 /*
3506   return the current logging function
3507   useful for external tdb routines that wish to log tdb errors
3508 */
3509 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3510 {
3511         return tdb->log.log_fn;
3512 }
3513
3514
3515 /*
3516   get the tdb sequence number. Only makes sense if the writers opened
3517   with TDB_SEQNUM set. Note that this sequence number will wrap quite
3518   quickly, so it should only be used for a 'has something changed'
3519   test, not for code that relies on the count of the number of changes
3520   made. If you want a counter then use a tdb record.
3521
3522   The aim of this sequence number is to allow for a very lightweight
3523   test of a possible tdb change.
3524 */
3525 int tdb_get_seqnum(struct tdb_context *tdb)
3526 {
3527         tdb_off_t seqnum=0;
3528
3529         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3530         return seqnum;
3531 }
3532
3533 int tdb_hash_size(struct tdb_context *tdb)
3534 {
3535         return tdb->header.hash_size;
3536 }
3537
3538 size_t tdb_map_size(struct tdb_context *tdb)
3539 {
3540         return tdb->map_size;
3541 }
3542
3543 int tdb_get_flags(struct tdb_context *tdb)
3544 {
3545         return tdb->flags;
3546 }
3547
3548 /* file: open.c */
3549
3550 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3551 static struct tdb_context *tdbs = NULL;
3552
3553
3554 /* This is based on the hash algorithm from gdbm */
3555 static unsigned int default_tdb_hash(TDB_DATA *key)
3556 {
3557         u32 value;      /* Used to compute the hash value.  */
3558         u32   i;        /* Used to cycle through random values. */
3559
3560         /* Set the initial value from the key size. */
3561         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
3562                 value = (value + (key->dptr[i] << (i*5 % 24)));
3563
3564         return (1103515243 * value + 12345);  
3565 }
3566
3567
3568 /* initialise a new database with a specified hash size */
3569 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3570 {
3571         struct tdb_header *newdb;
3572         int size, ret = -1;
3573
3574         /* We make it up in memory, then write it out if not internal */
3575         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3576         if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3577                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3578
3579         /* Fill in the header */
3580         newdb->version = TDB_VERSION;
3581         newdb->hash_size = hash_size;
3582         if (tdb->flags & TDB_INTERNAL) {
3583                 tdb->map_size = size;
3584                 tdb->map_ptr = (char *)newdb;
3585                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3586                 /* Convert the `ondisk' version if asked. */
3587                 CONVERT(*newdb);
3588                 return 0;
3589         }
3590         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3591                 goto fail;
3592
3593         if (ftruncate(tdb->fd, 0) == -1)
3594                 goto fail;
3595
3596         /* This creates an endian-converted header, as if read from disk */
3597         CONVERT(*newdb);
3598         memcpy(&tdb->header, newdb, sizeof(tdb->header));
3599         /* Don't endian-convert the magic food! */
3600         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3601         if (write(tdb->fd, newdb, size) != size) {
3602                 ret = -1;
3603         } else {
3604                 ret = 0;
3605         }
3606
3607   fail:
3608         SAFE_FREE(newdb);
3609         return ret;
3610 }
3611
3612
3613
3614 static int tdb_already_open(dev_t device,
3615                             ino_t ino)
3616 {
3617         struct tdb_context *i;
3618         
3619         for (i = tdbs; i; i = i->next) {
3620                 if (i->device == device && i->inode == ino) {
3621                         return 1;
3622                 }
3623         }
3624
3625         return 0;
3626 }
3627
3628 /* open the database, creating it if necessary 
3629
3630    The open_flags and mode are passed straight to the open call on the
3631    database file. A flags value of O_WRONLY is invalid. The hash size
3632    is advisory, use zero for a default value.
3633
3634    Return is NULL on error, in which case errno is also set.  Don't 
3635    try to call tdb_error or tdb_errname, just do strerror(errno).
3636
3637    @param name may be NULL for internal databases. */
3638 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3639                       int open_flags, mode_t mode)
3640 {
3641         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3642 }
3643
3644 /* a default logging function */
3645 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3646 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3647 {
3648 }
3649
3650
3651 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3652                                 int open_flags, mode_t mode,
3653                                 const struct tdb_logging_context *log_ctx,
3654                                 tdb_hash_func hash_fn)
3655 {
3656         struct tdb_context *tdb;
3657         struct stat st;
3658         int rev = 0, locked = 0;
3659         unsigned char *vp;
3660         u32 vertest;
3661
3662         if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3663                 /* Can't log this */
3664                 errno = ENOMEM;
3665                 goto fail;
3666         }
3667         tdb_io_init(tdb);
3668         tdb->fd = -1;
3669         tdb->name = NULL;
3670         tdb->map_ptr = NULL;
3671         tdb->flags = tdb_flags;
3672         tdb->open_flags = open_flags;
3673         if (log_ctx) {
3674                 tdb->log = *log_ctx;
3675         } else {
3676                 tdb->log.log_fn = null_log_fn;
3677                 tdb->log.log_private = NULL;
3678         }
3679         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3680
3681         /* cache the page size */
3682         tdb->page_size = getpagesize();
3683         if (tdb->page_size <= 0) {
3684                 tdb->page_size = 0x2000;
3685         }
3686
3687         if ((open_flags & O_ACCMODE) == O_WRONLY) {
3688                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3689                          name));
3690                 errno = EINVAL;
3691                 goto fail;
3692         }
3693         
3694         if (hash_size == 0)
3695                 hash_size = DEFAULT_HASH_SIZE;
3696         if ((open_flags & O_ACCMODE) == O_RDONLY) {
3697                 tdb->read_only = 1;
3698                 /* read only databases don't do locking or clear if first */
3699                 tdb->flags |= TDB_NOLOCK;
3700                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3701         }
3702
3703         /* internal databases don't mmap or lock, and start off cleared */
3704         if (tdb->flags & TDB_INTERNAL) {
3705                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3706                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3707                 if (tdb_new_database(tdb, hash_size) != 0) {
3708                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3709                         goto fail;
3710                 }
3711                 goto internal;
3712         }
3713
3714         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3715                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3716                          name, strerror(errno)));
3717                 goto fail;      /* errno set by open(2) */
3718         }
3719
3720         /* ensure there is only one process initialising at once */
3721         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3722                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3723                          name, strerror(errno)));
3724                 goto fail;      /* errno set by tdb_brlock */
3725         }
3726
3727         /* we need to zero database if we are the only one with it open */
3728         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3729             (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3730                 open_flags |= O_CREAT;
3731                 if (ftruncate(tdb->fd, 0) == -1) {
3732                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3733                                  "failed to truncate %s: %s\n",
3734                                  name, strerror(errno)));
3735                         goto fail; /* errno set by ftruncate */
3736                 }
3737         }
3738
3739         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3740             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3741             || (tdb->header.version != TDB_VERSION
3742                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3743                 /* its not a valid database - possibly initialise it */
3744                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3745                         errno = EIO; /* ie bad format or something */
3746                         goto fail;
3747                 }
3748                 rev = (tdb->flags & TDB_CONVERT);
3749         }
3750         vp = (unsigned char *)&tdb->header.version;
3751         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3752                   (((u32)vp[2]) << 8) | (u32)vp[3];
3753         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3754         if (!rev)
3755                 tdb->flags &= ~TDB_CONVERT;
3756         else {
3757                 tdb->flags |= TDB_CONVERT;
3758                 tdb_convert(&tdb->header, sizeof(tdb->header));
3759         }
3760         if (fstat(tdb->fd, &st) == -1)
3761                 goto fail;
3762
3763         if (tdb->header.rwlocks != 0) {
3764                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3765                 goto fail;
3766         }
3767
3768         /* Is it already in the open list?  If so, fail. */
3769         if (tdb_already_open(st.st_dev, st.st_ino)) {
3770                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3771                          "%s (%d,%d) is already open in this process\n",
3772                          name, (int)st.st_dev, (int)st.st_ino));
3773                 errno = EBUSY;
3774                 goto fail;
3775         }
3776
3777         if (!(tdb->name = (char *)strdup(name))) {
3778                 errno = ENOMEM;
3779                 goto fail;
3780         }
3781
3782         tdb->map_size = st.st_size;
3783         tdb->device = st.st_dev;
3784         tdb->inode = st.st_ino;
3785         tdb->max_dead_records = 0;
3786         tdb_mmap(tdb);
3787         if (locked) {
3788                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3789                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3790                                  "failed to take ACTIVE_LOCK on %s: %s\n",
3791                                  name, strerror(errno)));
3792                         goto fail;
3793                 }
3794
3795         }
3796
3797         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3798            we didn't get the initial exclusive lock as we need to let all other
3799            users know we're using it. */
3800
3801         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3802                 /* leave this lock in place to indicate it's in use */
3803                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3804                         goto fail;
3805         }
3806
3807         /* if needed, run recovery */
3808         if (tdb_transaction_recover(tdb) == -1) {
3809                 goto fail;
3810         }
3811
3812  internal:
3813         /* Internal (memory-only) databases skip all the code above to
3814          * do with disk files, and resume here by releasing their
3815          * global lock and hooking into the active list. */
3816         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3817                 goto fail;
3818         tdb->next = tdbs;
3819         tdbs = tdb;
3820         return tdb;
3821
3822  fail:
3823         { int save_errno = errno;
3824
3825         if (!tdb)
3826                 return NULL;
3827         
3828         if (tdb->map_ptr) {
3829                 if (tdb->flags & TDB_INTERNAL)
3830                         SAFE_FREE(tdb->map_ptr);
3831                 else
3832                         tdb_munmap(tdb);
3833         }
3834         SAFE_FREE(tdb->name);
3835         if (tdb->fd != -1)
3836                 if (close(tdb->fd) != 0)
3837                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3838         SAFE_FREE(tdb);
3839         errno = save_errno;
3840         return NULL;
3841         }
3842 }
3843
3844 /*
3845  * Set the maximum number of dead records per hash chain
3846  */
3847
3848 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
3849 {
3850         tdb->max_dead_records = max_dead;
3851 }
3852
3853 /**
3854  * Close a database.
3855  *
3856  * @returns -1 for error; 0 for success.
3857  **/
3858 int tdb_close(struct tdb_context *tdb)
3859 {
3860         struct tdb_context **i;
3861         int ret = 0;
3862
3863         if (tdb->transaction) {
3864                 tdb_transaction_cancel(tdb);
3865         }
3866
3867         if (tdb->map_ptr) {
3868                 if (tdb->flags & TDB_INTERNAL)
3869                         SAFE_FREE(tdb->map_ptr);
3870                 else
3871                         tdb_munmap(tdb);
3872         }
3873         SAFE_FREE(tdb->name);
3874         if (tdb->fd != -1)
3875                 ret = close(tdb->fd);
3876         SAFE_FREE(tdb->lockrecs);
3877
3878         /* Remove from contexts list */
3879         for (i = &tdbs; *i; i = &(*i)->next) {
3880                 if (*i == tdb) {
3881                         *i = tdb->next;
3882                         break;
3883                 }
3884         }
3885
3886         memset(tdb, 0, sizeof(*tdb));
3887         SAFE_FREE(tdb);
3888
3889         return ret;
3890 }
3891
3892 /* register a loging function */
3893 void tdb_set_logging_function(struct tdb_context *tdb,
3894                               const struct tdb_logging_context *log_ctx)
3895 {
3896         tdb->log = *log_ctx;
3897 }
3898
3899 void *tdb_get_logging_private(struct tdb_context *tdb)
3900 {
3901         return tdb->log.log_private;
3902 }
3903
3904 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
3905    seek pointer from our parent and to re-establish locks */
3906 int tdb_reopen(struct tdb_context *tdb)
3907 {
3908         struct stat st;
3909
3910         if (tdb->flags & TDB_INTERNAL) {
3911                 return 0; /* Nothing to do. */
3912         }
3913
3914         if (tdb->num_locks != 0 || tdb->global_lock.count) {
3915                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
3916                 goto fail;
3917         }
3918
3919         if (tdb->transaction != 0) {
3920                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
3921                 goto fail;
3922         }
3923
3924         if (tdb_munmap(tdb) != 0) {
3925                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
3926                 goto fail;
3927         }
3928         if (close(tdb->fd) != 0)
3929                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
3930         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
3931         if (tdb->fd == -1) {
3932                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
3933                 goto fail;
3934         }
3935         if ((tdb->flags & TDB_CLEAR_IF_FIRST) && 
3936             (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
3937                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
3938                 goto fail;
3939         }
3940         if (fstat(tdb->fd, &st) != 0) {
3941                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
3942                 goto fail;
3943         }
3944         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
3945                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
3946                 goto fail;
3947         }
3948         tdb_mmap(tdb);
3949
3950         return 0;
3951
3952 fail:
3953         tdb_close(tdb);
3954         return -1;
3955 }
3956
3957 /* reopen all tdb's */
3958 int tdb_reopen_all(int parent_longlived)
3959 {
3960         struct tdb_context *tdb;
3961
3962         for (tdb=tdbs; tdb; tdb = tdb->next) {
3963                 /*
3964                  * If the parent is longlived (ie. a
3965                  * parent daemon architecture), we know
3966                  * it will keep it's active lock on a
3967                  * tdb opened with CLEAR_IF_FIRST. Thus
3968                  * for child processes we don't have to
3969                  * add an active lock. This is essential
3970                  * to improve performance on systems that
3971                  * keep POSIX locks as a non-scalable data
3972                  * structure in the kernel.
3973                  */
3974                 if (parent_longlived) {
3975                         /* Ensure no clear-if-first. */
3976                         tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3977                 }
3978
3979                 if (tdb_reopen(tdb) != 0)
3980                         return -1;
3981         }
3982
3983         return 0;
3984 }