Whamcloud - gitweb
Define _XOPEN_SOURCE to 600 consistently for Solaris C99 support
[tools/e2fsprogs.git] / lib / ext2fs / tdb.c
1 /*
2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3 Rev: 23590
4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5 */
6  /* 
7    trivial database library - standalone version
8
9    Copyright (C) Andrew Tridgell              1999-2005
10    Copyright (C) Jeremy Allison               2000-2006
11    Copyright (C) Paul `Rusty' Russell         2000
12    
13      ** NOTE! The following LGPL license applies to the tdb
14      ** library. This does NOT imply that all of Samba is released
15      ** under the LGPL
16    
17    This library is free software; you can redistribute it and/or
18    modify it under the terms of the GNU Lesser General Public
19    License as published by the Free Software Foundation; either
20    version 2 of the License, or (at your option) any later version.
21
22    This library is distributed in the hope that it will be useful,
23    but WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25    Lesser General Public License for more details.
26
27    You should have received a copy of the GNU Lesser General Public
28    License along with this library; if not, write to the Free Software
29    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30 */
31
32 #ifdef CONFIG_STAND_ALONE
33 #define HAVE_MMAP
34 #define HAVE_STRDUP
35 #define HAVE_SYS_MMAN_H
36 #define HAVE_UTIME_H
37 #define HAVE_UTIME
38 #endif
39 #define _XOPEN_SOURCE 600
40
41 #include <unistd.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <stdarg.h>
45 #include <stddef.h>
46 #include <errno.h>
47 #include <string.h>
48 #ifdef HAVE_SYS_SELECT_H
49 #include <sys/select.h>
50 #endif
51 #include <sys/time.h>
52 #include <sys/types.h>
53 #include <time.h>
54 #ifdef HAVE_UTIME_H
55 #include <utime.h>
56 #endif
57 #include <sys/stat.h>
58 #include <sys/file.h>
59 #include <fcntl.h>
60
61 #ifdef HAVE_SYS_MMAN_H
62 #include <sys/mman.h>
63 #endif
64
65 #ifndef MAP_FILE
66 #define MAP_FILE 0
67 #endif
68
69 #ifndef MAP_FAILED
70 #define MAP_FAILED ((void *)-1)
71 #endif
72
73 #ifndef HAVE_STRDUP
74 #define strdup rep_strdup
75 static char *rep_strdup(const char *s)
76 {
77         char *ret;
78         int length;
79         if (!s)
80                 return NULL;
81
82         if (!length)
83                 length = strlen(s);
84
85         ret = malloc(length + 1);
86         if (ret) {
87                 strncpy(ret, s, length);
88                 ret[length] = '\0';
89         }
90         return ret;
91 }
92 #endif
93
94 #ifndef PRINTF_ATTRIBUTE
95 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
96 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
97  * the parameter containing the format, and a2 the index of the first
98  * argument. Note that some gcc 2.x versions don't handle this
99  * properly **/
100 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
101 #else
102 #define PRINTF_ATTRIBUTE(a1, a2)
103 #endif
104 #endif
105
106 typedef int bool;
107
108 #include "tdb.h"
109
110 #ifndef u32
111 #define u32 unsigned
112 #endif
113
114 #ifndef HAVE_GETPAGESIZE
115 #define getpagesize() 0x2000
116 #endif
117
118 typedef u32 tdb_len_t;
119 typedef u32 tdb_off_t;
120
121 #ifndef offsetof
122 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
123 #endif
124
125 #define TDB_MAGIC_FOOD "TDB file\n"
126 #define TDB_VERSION (0x26011967 + 6)
127 #define TDB_MAGIC (0x26011999U)
128 #define TDB_FREE_MAGIC (~TDB_MAGIC)
129 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
130 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
131 #define TDB_ALIGNMENT 4
132 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
133 #define DEFAULT_HASH_SIZE 131
134 #define FREELIST_TOP (sizeof(struct tdb_header))
135 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
136 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
137 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
138 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
139 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
140 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
141 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
142 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
143 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
144 #define TDB_PAD_BYTE 0x42
145 #define TDB_PAD_U32  0x42424242
146
147 /* NB assumes there is a local variable called "tdb" that is the
148  * current context, also takes doubly-parenthesized print-style
149  * argument. */
150 #define TDB_LOG(x) tdb->log.log_fn x
151
152 /* lock offsets */
153 #define GLOBAL_LOCK      0
154 #define ACTIVE_LOCK      4
155 #define TRANSACTION_LOCK 8
156
157 /* free memory if the pointer is valid and zero the pointer */
158 #ifndef SAFE_FREE
159 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
160 #endif
161
162 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
163
164 #define DOCONV() (tdb->flags & TDB_CONVERT)
165 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
166
167
168 /* the body of the database is made of one list_struct for the free space
169    plus a separate data list for each hash value */
170 struct list_struct {
171         tdb_off_t next; /* offset of the next record in the list */
172         tdb_len_t rec_len; /* total byte length of record */
173         tdb_len_t key_len; /* byte length of key */
174         tdb_len_t data_len; /* byte length of data */
175         u32 full_hash; /* the full 32 bit hash of the key */
176         u32 magic;   /* try to catch errors */
177         /* the following union is implied:
178                 union {
179                         char record[rec_len];
180                         struct {
181                                 char key[key_len];
182                                 char data[data_len];
183                         }
184                         u32 totalsize; (tailer)
185                 }
186         */
187 };
188
189
190 /* this is stored at the front of every database */
191 struct tdb_header {
192         char magic_food[32]; /* for /etc/magic */
193         u32 version; /* version of the code */
194         u32 hash_size; /* number of hash entries */
195         tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
196         tdb_off_t recovery_start; /* offset of transaction recovery region */
197         tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
198         tdb_off_t reserved[29];
199 };
200
201 struct tdb_lock_type {
202         int list;
203         u32 count;
204         u32 ltype;
205 };
206
207 struct tdb_traverse_lock {
208         struct tdb_traverse_lock *next;
209         u32 off;
210         u32 hash;
211         int lock_rw;
212 };
213
214
215 struct tdb_methods {
216         int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
217         int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
218         void (*next_hash_chain)(struct tdb_context *, u32 *);
219         int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
220         int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
221         int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
222 };
223
224 struct tdb_context {
225         char *name; /* the name of the database */
226         void *map_ptr; /* where it is currently mapped */
227         int fd; /* open file descriptor for the database */
228         tdb_len_t map_size; /* how much space has been mapped */
229         int read_only; /* opened read-only */
230         int traverse_read; /* read-only traversal */
231         struct tdb_lock_type global_lock;
232         int num_lockrecs;
233         struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
234         enum TDB_ERROR ecode; /* error code for last tdb error */
235         struct tdb_header header; /* a cached copy of the header */
236         u32 flags; /* the flags passed to tdb_open */
237         struct tdb_traverse_lock travlocks; /* current traversal locks */
238         struct tdb_context *next; /* all tdbs to avoid multiple opens */
239         dev_t device;   /* uniquely identifies this tdb */
240         ino_t inode;    /* uniquely identifies this tdb */
241         struct tdb_logging_context log;
242         unsigned int (*hash_fn)(TDB_DATA *key);
243         int open_flags; /* flags used in the open - needed by reopen */
244         unsigned int num_locks; /* number of chain locks held */
245         const struct tdb_methods *methods;
246         struct tdb_transaction *transaction;
247         int page_size;
248         int max_dead_records;
249         bool have_transaction_lock;
250 };
251
252
253 /*
254   internal prototypes
255 */
256 static int tdb_munmap(struct tdb_context *tdb);
257 static void tdb_mmap(struct tdb_context *tdb);
258 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
259 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
260 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
261 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
262 static int tdb_transaction_unlock(struct tdb_context *tdb);
263 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
264 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
265 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
266 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
268 static void *tdb_convert(void *buf, u32 size);
269 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
270 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
271 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
272 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
273 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
274 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
275 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
276 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
277 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
278 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
279 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
280                    tdb_off_t offset, tdb_len_t len,
281                    int (*parser)(TDB_DATA key, TDB_DATA data,
282                                  void *private_data),
283                    void *private_data);
284 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
285                            struct list_struct *rec);
286 static void tdb_io_init(struct tdb_context *tdb);
287 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
288 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
289                       struct list_struct *rec);
290
291
292 /* file: error.c */
293
294 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
295 {
296         return tdb->ecode;
297 }
298
299 static struct tdb_errname {
300         enum TDB_ERROR ecode; const char *estring;
301 } emap[] = { {TDB_SUCCESS, "Success"},
302              {TDB_ERR_CORRUPT, "Corrupt database"},
303              {TDB_ERR_IO, "IO Error"},
304              {TDB_ERR_LOCK, "Locking error"},
305              {TDB_ERR_OOM, "Out of memory"},
306              {TDB_ERR_EXISTS, "Record exists"},
307              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
308              {TDB_ERR_EINVAL, "Invalid parameter"},
309              {TDB_ERR_NOEXIST, "Record does not exist"},
310              {TDB_ERR_RDONLY, "write not permitted"} };
311
312 /* Error string for the last tdb error */
313 const char *tdb_errorstr(struct tdb_context *tdb)
314 {
315         u32 i;
316         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
317                 if (tdb->ecode == emap[i].ecode)
318                         return emap[i].estring;
319         return "Invalid error code";
320 }
321
322 /* file: lock.c */
323
324 #define TDB_MARK_LOCK 0x80000000
325
326 /* a byte range locking function - return 0 on success
327    this functions locks/unlocks 1 byte at the specified offset.
328
329    On error, errno is also set so that errors are passed back properly
330    through tdb_open(). 
331
332    note that a len of zero means lock to end of file
333 */
334 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
335                int rw_type, int lck_type, int probe, size_t len)
336 {
337         struct flock fl;
338         int ret;
339
340         if (tdb->flags & TDB_NOLOCK) {
341                 return 0;
342         }
343
344         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
345                 tdb->ecode = TDB_ERR_RDONLY;
346                 return -1;
347         }
348
349         fl.l_type = rw_type;
350         fl.l_whence = SEEK_SET;
351         fl.l_start = offset;
352         fl.l_len = len;
353         fl.l_pid = 0;
354
355         do {
356                 ret = fcntl(tdb->fd,lck_type,&fl);
357         } while (ret == -1 && errno == EINTR);
358
359         if (ret == -1) {
360                 /* Generic lock error. errno set by fcntl.
361                  * EAGAIN is an expected return from non-blocking
362                  * locks. */
363                 if (!probe && lck_type != F_SETLK) {
364                         /* Ensure error code is set for log fun to examine. */
365                         tdb->ecode = TDB_ERR_LOCK;
366                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 
367                                  tdb->fd, offset, rw_type, lck_type, (int)len));
368                 }
369                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
370         }
371         return 0;
372 }
373
374
375 /*
376   upgrade a read lock to a write lock. This needs to be handled in a
377   special way as some OSes (such as solaris) have too conservative
378   deadlock detection and claim a deadlock when progress can be
379   made. For those OSes we may loop for a while.  
380 */
381 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
382 {
383         int count = 1000;
384         while (count--) {
385                 struct timeval tv;
386                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
387                         return 0;
388                 }
389                 if (errno != EDEADLK) {
390                         break;
391                 }
392                 /* sleep for as short a time as we can - more portable than usleep() */
393                 tv.tv_sec = 0;
394                 tv.tv_usec = 1;
395                 select(0, NULL, NULL, NULL, &tv);
396         }
397         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
398         return -1;
399 }
400
401
402 /* lock a list in the database. list -1 is the alloc list */
403 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
404 {
405         struct tdb_lock_type *new_lck;
406         int i;
407         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
408
409         ltype &= ~TDB_MARK_LOCK;
410
411         /* a global lock allows us to avoid per chain locks */
412         if (tdb->global_lock.count && 
413             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
414                 return 0;
415         }
416
417         if (tdb->global_lock.count) {
418                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
419         }
420
421         if (list < -1 || list >= (int)tdb->header.hash_size) {
422                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n", 
423                            list, ltype));
424                 return -1;
425         }
426         if (tdb->flags & TDB_NOLOCK)
427                 return 0;
428
429         for (i=0; i<tdb->num_lockrecs; i++) {
430                 if (tdb->lockrecs[i].list == list) {
431                         if (tdb->lockrecs[i].count == 0) {
432                                 /*
433                                  * Can't happen, see tdb_unlock(). It should
434                                  * be an assert.
435                                  */
436                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
437                                          "lck->count == 0 for list %d", list));
438                         }
439                         /*
440                          * Just increment the in-memory struct, posix locks
441                          * don't stack.
442                          */
443                         tdb->lockrecs[i].count++;
444                         return 0;
445                 }
446         }
447
448         new_lck = (struct tdb_lock_type *)realloc(
449                 tdb->lockrecs,
450                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
451         if (new_lck == NULL) {
452                 errno = ENOMEM;
453                 return -1;
454         }
455         tdb->lockrecs = new_lck;
456
457         /* Since fcntl locks don't nest, we do a lock for the first one,
458            and simply bump the count for future ones */
459         if (!mark_lock &&
460             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
461                                      0, 1)) {
462                 return -1;
463         }
464
465         tdb->num_locks++;
466
467         tdb->lockrecs[tdb->num_lockrecs].list = list;
468         tdb->lockrecs[tdb->num_lockrecs].count = 1;
469         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
470         tdb->num_lockrecs += 1;
471
472         return 0;
473 }
474
475 /* lock a list in the database. list -1 is the alloc list */
476 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
477 {
478         int ret;
479         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
480         if (ret) {
481                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
482                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
483         }
484         return ret;
485 }
486
487 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
488 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
489 {
490         return _tdb_lock(tdb, list, ltype, F_SETLK);
491 }
492
493
494 /* unlock the database: returns void because it's too late for errors. */
495         /* changed to return int it may be interesting to know there
496            has been an error  --simo */
497 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
498 {
499         int ret = -1;
500         int i;
501         struct tdb_lock_type *lck = NULL;
502         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
503
504         ltype &= ~TDB_MARK_LOCK;
505
506         /* a global lock allows us to avoid per chain locks */
507         if (tdb->global_lock.count && 
508             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
509                 return 0;
510         }
511
512         if (tdb->global_lock.count) {
513                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
514         }
515
516         if (tdb->flags & TDB_NOLOCK)
517                 return 0;
518
519         /* Sanity checks */
520         if (list < -1 || list >= (int)tdb->header.hash_size) {
521                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
522                 return ret;
523         }
524
525         for (i=0; i<tdb->num_lockrecs; i++) {
526                 if (tdb->lockrecs[i].list == list) {
527                         lck = &tdb->lockrecs[i];
528                         break;
529                 }
530         }
531
532         if ((lck == NULL) || (lck->count == 0)) {
533                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
534                 return -1;
535         }
536
537         if (lck->count > 1) {
538                 lck->count--;
539                 return 0;
540         }
541
542         /*
543          * This lock has count==1 left, so we need to unlock it in the
544          * kernel. We don't bother with decrementing the in-memory array
545          * element, we're about to overwrite it with the last array element
546          * anyway.
547          */
548
549         if (mark_lock) {
550                 ret = 0;
551         } else {
552                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
553                                                F_SETLKW, 0, 1);
554         }
555         tdb->num_locks--;
556
557         /*
558          * Shrink the array by overwriting the element just unlocked with the
559          * last array element.
560          */
561
562         if (tdb->num_lockrecs > 1) {
563                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
564         }
565         tdb->num_lockrecs -= 1;
566
567         /*
568          * We don't bother with realloc when the array shrinks, but if we have
569          * a completely idle tdb we should get rid of the locked array.
570          */
571
572         if (tdb->num_lockrecs == 0) {
573                 SAFE_FREE(tdb->lockrecs);
574         }
575
576         if (ret)
577                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n")); 
578         return ret;
579 }
580
581 /*
582   get the transaction lock
583  */
584 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
585 {
586         if (tdb->have_transaction_lock || tdb->global_lock.count) {
587                 return 0;
588         }
589         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype, 
590                                      F_SETLKW, 0, 1) == -1) {
591                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
592                 tdb->ecode = TDB_ERR_LOCK;
593                 return -1;
594         }
595         tdb->have_transaction_lock = 1;
596         return 0;
597 }
598
599 /*
600   release the transaction lock
601  */
602 int tdb_transaction_unlock(struct tdb_context *tdb)
603 {
604         int ret;
605         if (!tdb->have_transaction_lock) {
606                 return 0;
607         }
608         ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
609         if (ret == 0) {
610                 tdb->have_transaction_lock = 0;
611         }
612         return ret;
613 }
614
615
616
617
618 /* lock/unlock entire database */
619 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
620 {
621         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
622
623         ltype &= ~TDB_MARK_LOCK;
624
625         /* There are no locks on read-only dbs */
626         if (tdb->read_only || tdb->traverse_read)
627                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
628
629         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
630                 tdb->global_lock.count++;
631                 return 0;
632         }
633
634         if (tdb->global_lock.count) {
635                 /* a global lock of a different type exists */
636                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
637         }
638         
639         if (tdb->num_locks != 0) {
640                 /* can't combine global and chain locks */
641                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
642         }
643
644         if (!mark_lock &&
645             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
646                                      0, 4*tdb->header.hash_size)) {
647                 if (op == F_SETLKW) {
648                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
649                 }
650                 return -1;
651         }
652
653         tdb->global_lock.count = 1;
654         tdb->global_lock.ltype = ltype;
655
656         return 0;
657 }
658
659
660
661 /* unlock entire db */
662 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
663 {
664         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
665
666         ltype &= ~TDB_MARK_LOCK;
667
668         /* There are no locks on read-only dbs */
669         if (tdb->read_only || tdb->traverse_read) {
670                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
671         }
672
673         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
674                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
675         }
676
677         if (tdb->global_lock.count > 1) {
678                 tdb->global_lock.count--;
679                 return 0;
680         }
681
682         if (!mark_lock &&
683             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 
684                                      0, 4*tdb->header.hash_size)) {
685                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
686                 return -1;
687         }
688
689         tdb->global_lock.count = 0;
690         tdb->global_lock.ltype = 0;
691
692         return 0;
693 }
694
695 /* lock entire database with write lock */
696 int tdb_lockall(struct tdb_context *tdb)
697 {
698         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
699 }
700
701 /* lock entire database with write lock - mark only */
702 int tdb_lockall_mark(struct tdb_context *tdb)
703 {
704         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
705 }
706
707 /* unlock entire database with write lock - unmark only */
708 int tdb_lockall_unmark(struct tdb_context *tdb)
709 {
710         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
711 }
712
713 /* lock entire database with write lock - nonblocking varient */
714 int tdb_lockall_nonblock(struct tdb_context *tdb)
715 {
716         return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
717 }
718
719 /* unlock entire database with write lock */
720 int tdb_unlockall(struct tdb_context *tdb)
721 {
722         return _tdb_unlockall(tdb, F_WRLCK);
723 }
724
725 /* lock entire database with read lock */
726 int tdb_lockall_read(struct tdb_context *tdb)
727 {
728         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
729 }
730
731 /* lock entire database with read lock - nonblock varient */
732 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
733 {
734         return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
735 }
736
737 /* unlock entire database with read lock */
738 int tdb_unlockall_read(struct tdb_context *tdb)
739 {
740         return _tdb_unlockall(tdb, F_RDLCK);
741 }
742
743 /* lock/unlock one hash chain. This is meant to be used to reduce
744    contention - it cannot guarantee how many records will be locked */
745 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
746 {
747         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
748 }
749
750 /* lock/unlock one hash chain, non-blocking. This is meant to be used
751    to reduce contention - it cannot guarantee how many records will be
752    locked */
753 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
754 {
755         return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
756 }
757
758 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
759 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
760 {
761         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
762 }
763
764 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
765 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
766 {
767         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
768 }
769
770 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
771 {
772         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
773 }
774
775 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
776 {
777         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
778 }
779
780 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
781 {
782         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
783 }
784
785
786
787 /* record lock stops delete underneath */
788 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
789 {
790         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
791 }
792
793 /*
794   Write locks override our own fcntl readlocks, so check it here.
795   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
796   an error to fail to get the lock here.
797 */
798 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
799 {
800         struct tdb_traverse_lock *i;
801         for (i = &tdb->travlocks; i; i = i->next)
802                 if (i->off == off)
803                         return -1;
804         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
805 }
806
807 /*
808   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
809   an error to fail to get the lock here.
810 */
811 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
812 {
813         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
814 }
815
816 /* fcntl locks don't stack: avoid unlocking someone else's */
817 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
818 {
819         struct tdb_traverse_lock *i;
820         u32 count = 0;
821
822         if (off == 0)
823                 return 0;
824         for (i = &tdb->travlocks; i; i = i->next)
825                 if (i->off == off)
826                         count++;
827         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
828 }
829
830 /* file: io.c */
831
832 /* check for an out of bounds access - if it is out of bounds then
833    see if the database has been expanded by someone else and expand
834    if necessary 
835    note that "len" is the minimum length needed for the db
836 */
837 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
838 {
839         struct stat st;
840         if (len <= tdb->map_size)
841                 return 0;
842         if (tdb->flags & TDB_INTERNAL) {
843                 if (!probe) {
844                         /* Ensure ecode is set for log fn. */
845                         tdb->ecode = TDB_ERR_IO;
846                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
847                                  (int)len, (int)tdb->map_size));
848                 }
849                 return TDB_ERRCODE(TDB_ERR_IO, -1);
850         }
851
852         if (fstat(tdb->fd, &st) == -1) {
853                 return TDB_ERRCODE(TDB_ERR_IO, -1);
854         }
855
856         if (st.st_size < (size_t)len) {
857                 if (!probe) {
858                         /* Ensure ecode is set for log fn. */
859                         tdb->ecode = TDB_ERR_IO;
860                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
861                                  (int)len, (int)st.st_size));
862                 }
863                 return TDB_ERRCODE(TDB_ERR_IO, -1);
864         }
865
866         /* Unmap, update size, remap */
867         if (tdb_munmap(tdb) == -1)
868                 return TDB_ERRCODE(TDB_ERR_IO, -1);
869         tdb->map_size = st.st_size;
870         tdb_mmap(tdb);
871         return 0;
872 }
873
874 /* write a lump of data at a specified offset */
875 static int tdb_write(struct tdb_context *tdb, tdb_off_t off, 
876                      const void *buf, tdb_len_t len)
877 {
878         if (len == 0) {
879                 return 0;
880         }
881
882         if (tdb->read_only || tdb->traverse_read) {
883                 tdb->ecode = TDB_ERR_RDONLY;
884                 return -1;
885         }
886
887         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
888                 return -1;
889
890         if (tdb->map_ptr) {
891                 memcpy(off + (char *)tdb->map_ptr, buf, len);
892         } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
893                 /* Ensure ecode is set for log fn. */
894                 tdb->ecode = TDB_ERR_IO;
895                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
896                            off, len, strerror(errno)));
897                 return TDB_ERRCODE(TDB_ERR_IO, -1);
898         }
899         return 0;
900 }
901
902 /* Endian conversion: we only ever deal with 4 byte quantities */
903 void *tdb_convert(void *buf, u32 size)
904 {
905         u32 i, *p = (u32 *)buf;
906         for (i = 0; i < size / 4; i++)
907                 p[i] = TDB_BYTEREV(p[i]);
908         return buf;
909 }
910
911
912 /* read a lump of data at a specified offset, maybe convert */
913 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
914                     tdb_len_t len, int cv)
915 {
916         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
917                 return -1;
918         }
919
920         if (tdb->map_ptr) {
921                 memcpy(buf, off + (char *)tdb->map_ptr, len);
922         } else {
923                 ssize_t ret = pread(tdb->fd, buf, len, off);
924                 if (ret != (ssize_t)len) {
925                         /* Ensure ecode is set for log fn. */
926                         tdb->ecode = TDB_ERR_IO;
927                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
928                                  "len=%d ret=%d (%s) map_size=%d\n",
929                                  (int)off, (int)len, (int)ret, strerror(errno),
930                                  (int)tdb->map_size));
931                         return TDB_ERRCODE(TDB_ERR_IO, -1);
932                 }
933         }
934         if (cv) {
935                 tdb_convert(buf, len);
936         }
937         return 0;
938 }
939
940
941
942 /*
943   do an unlocked scan of the hash table heads to find the next non-zero head. The value
944   will then be confirmed with the lock held
945 */              
946 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
947 {
948         u32 h = *chain;
949         if (tdb->map_ptr) {
950                 for (;h < tdb->header.hash_size;h++) {
951                         if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
952                                 break;
953                         }
954                 }
955         } else {
956                 u32 off=0;
957                 for (;h < tdb->header.hash_size;h++) {
958                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
959                                 break;
960                         }
961                 }
962         }
963         (*chain) = h;
964 }
965
966
967 int tdb_munmap(struct tdb_context *tdb)
968 {
969         if (tdb->flags & TDB_INTERNAL)
970                 return 0;
971
972 #ifdef HAVE_MMAP
973         if (tdb->map_ptr) {
974                 int ret = munmap(tdb->map_ptr, tdb->map_size);
975                 if (ret != 0)
976                         return ret;
977         }
978 #endif
979         tdb->map_ptr = NULL;
980         return 0;
981 }
982
983 void tdb_mmap(struct tdb_context *tdb)
984 {
985         if (tdb->flags & TDB_INTERNAL)
986                 return;
987
988 #ifdef HAVE_MMAP
989         if (!(tdb->flags & TDB_NOMMAP)) {
990                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
991                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
992                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
993
994                 /*
995                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
996                  */
997
998                 if (tdb->map_ptr == MAP_FAILED) {
999                         tdb->map_ptr = NULL;
1000                         TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n", 
1001                                  tdb->map_size, strerror(errno)));
1002                 }
1003         } else {
1004                 tdb->map_ptr = NULL;
1005         }
1006 #else
1007         tdb->map_ptr = NULL;
1008 #endif
1009 }
1010
1011 /* expand a file.  we prefer to use ftruncate, as that is what posix
1012   says to use for mmap expansion */
1013 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1014 {
1015         char buf[1024];
1016
1017         if (tdb->read_only || tdb->traverse_read) {
1018                 tdb->ecode = TDB_ERR_RDONLY;
1019                 return -1;
1020         }
1021
1022         if (ftruncate(tdb->fd, size+addition) == -1) {
1023                 char b = 0;
1024                 if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1025                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n", 
1026                                  size+addition, strerror(errno)));
1027                         return -1;
1028                 }
1029         }
1030
1031         /* now fill the file with something. This ensures that the
1032            file isn't sparse, which would be very bad if we ran out of
1033            disk. This must be done with write, not via mmap */
1034         memset(buf, TDB_PAD_BYTE, sizeof(buf));
1035         while (addition) {
1036                 int n = addition>sizeof(buf)?sizeof(buf):addition;
1037                 int ret = pwrite(tdb->fd, buf, n, size);
1038                 if (ret != n) {
1039                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n", 
1040                                    n, strerror(errno)));
1041                         return -1;
1042                 }
1043                 addition -= n;
1044                 size += n;
1045         }
1046         return 0;
1047 }
1048
1049
1050 /* expand the database at least size bytes by expanding the underlying
1051    file and doing the mmap again if necessary */
1052 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1053 {
1054         struct list_struct rec;
1055         tdb_off_t offset;
1056
1057         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1058                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1059                 return -1;
1060         }
1061
1062         /* must know about any previous expansions by another process */
1063         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1064
1065         /* always make room for at least 10 more records, and round
1066            the database up to a multiple of the page size */
1067         size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1068
1069         if (!(tdb->flags & TDB_INTERNAL))
1070                 tdb_munmap(tdb);
1071
1072         /*
1073          * We must ensure the file is unmapped before doing this
1074          * to ensure consistency with systems like OpenBSD where
1075          * writes and mmaps are not consistent.
1076          */
1077
1078         /* expand the file itself */
1079         if (!(tdb->flags & TDB_INTERNAL)) {
1080                 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1081                         goto fail;
1082         }
1083
1084         tdb->map_size += size;
1085
1086         if (tdb->flags & TDB_INTERNAL) {
1087                 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1088                                                     tdb->map_size);
1089                 if (!new_map_ptr) {
1090                         tdb->map_size -= size;
1091                         goto fail;
1092                 }
1093                 tdb->map_ptr = new_map_ptr;
1094         } else {
1095                 /*
1096                  * We must ensure the file is remapped before adding the space
1097                  * to ensure consistency with systems like OpenBSD where
1098                  * writes and mmaps are not consistent.
1099                  */
1100
1101                 /* We're ok if the mmap fails as we'll fallback to read/write */
1102                 tdb_mmap(tdb);
1103         }
1104
1105         /* form a new freelist record */
1106         memset(&rec,'\0',sizeof(rec));
1107         rec.rec_len = size - sizeof(rec);
1108
1109         /* link it into the free list */
1110         offset = tdb->map_size - size;
1111         if (tdb_free(tdb, offset, &rec) == -1)
1112                 goto fail;
1113
1114         tdb_unlock(tdb, -1, F_WRLCK);
1115         return 0;
1116  fail:
1117         tdb_unlock(tdb, -1, F_WRLCK);
1118         return -1;
1119 }
1120
1121 /* read/write a tdb_off_t */
1122 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1123 {
1124         return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1125 }
1126
1127 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1128 {
1129         tdb_off_t off = *d;
1130         return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1131 }
1132
1133
1134 /* read a lump of data, allocating the space for it */
1135 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1136 {
1137         unsigned char *buf;
1138
1139         /* some systems don't like zero length malloc */
1140         if (len == 0) {
1141                 len = 1;
1142         }
1143
1144         if (!(buf = (unsigned char *)malloc(len))) {
1145                 /* Ensure ecode is set for log fn. */
1146                 tdb->ecode = TDB_ERR_OOM;
1147                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1148                            len, strerror(errno)));
1149                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1150         }
1151         if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1152                 SAFE_FREE(buf);
1153                 return NULL;
1154         }
1155         return buf;
1156 }
1157
1158 /* Give a piece of tdb data to a parser */
1159
1160 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1161                    tdb_off_t offset, tdb_len_t len,
1162                    int (*parser)(TDB_DATA key, TDB_DATA data,
1163                                  void *private_data),
1164                    void *private_data)
1165 {
1166         TDB_DATA data;
1167         int result;
1168
1169         data.dsize = len;
1170
1171         if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1172                 /*
1173                  * Optimize by avoiding the malloc/memcpy/free, point the
1174                  * parser directly at the mmap area.
1175                  */
1176                 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1177                         return -1;
1178                 }
1179                 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1180                 return parser(key, data, private_data);
1181         }
1182
1183         if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1184                 return -1;
1185         }
1186
1187         result = parser(key, data, private_data);
1188         free(data.dptr);
1189         return result;
1190 }
1191
1192 /* read/write a record */
1193 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1194 {
1195         if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1196                 return -1;
1197         if (TDB_BAD_MAGIC(rec)) {
1198                 /* Ensure ecode is set for log fn. */
1199                 tdb->ecode = TDB_ERR_CORRUPT;
1200                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1201                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1202         }
1203         return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1204 }
1205
1206 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1207 {
1208         struct list_struct r = *rec;
1209         return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1210 }
1211
1212 static const struct tdb_methods io_methods = {
1213         tdb_read,
1214         tdb_write,
1215         tdb_next_hash_chain,
1216         tdb_oob,
1217         tdb_expand_file,
1218         tdb_brlock
1219 };
1220
1221 /*
1222   initialise the default methods table
1223 */
1224 void tdb_io_init(struct tdb_context *tdb)
1225 {
1226         tdb->methods = &io_methods;
1227 }
1228
1229 /* file: transaction.c */
1230
1231 /*
1232   transaction design:
1233
1234   - only allow a single transaction at a time per database. This makes
1235     using the transaction API simpler, as otherwise the caller would
1236     have to cope with temporary failures in transactions that conflict
1237     with other current transactions
1238
1239   - keep the transaction recovery information in the same file as the
1240     database, using a special 'transaction recovery' record pointed at
1241     by the header. This removes the need for extra journal files as
1242     used by some other databases
1243
1244   - dynamically allocated the transaction recover record, re-using it
1245     for subsequent transactions. If a larger record is needed then
1246     tdb_free() the old record to place it on the normal tdb freelist
1247     before allocating the new record
1248
1249   - during transactions, keep a linked list of writes all that have
1250     been performed by intercepting all tdb_write() calls. The hooked
1251     transaction versions of tdb_read() and tdb_write() check this
1252     linked list and try to use the elements of the list in preference
1253     to the real database.
1254
1255   - don't allow any locks to be held when a transaction starts,
1256     otherwise we can end up with deadlock (plus lack of lock nesting
1257     in posix locks would mean the lock is lost)
1258
1259   - if the caller gains a lock during the transaction but doesn't
1260     release it then fail the commit
1261
1262   - allow for nested calls to tdb_transaction_start(), re-using the
1263     existing transaction record. If the inner transaction is cancelled
1264     then a subsequent commit will fail
1265  
1266   - keep a mirrored copy of the tdb hash chain heads to allow for the
1267     fast hash heads scan on traverse, updating the mirrored copy in
1268     the transaction version of tdb_write
1269
1270   - allow callers to mix transaction and non-transaction use of tdb,
1271     although once a transaction is started then an exclusive lock is
1272     gained until the transaction is committed or cancelled
1273
1274   - the commit stategy involves first saving away all modified data
1275     into a linearised buffer in the transaction recovery area, then
1276     marking the transaction recovery area with a magic value to
1277     indicate a valid recovery record. In total 4 fsync/msync calls are
1278     needed per commit to prevent race conditions. It might be possible
1279     to reduce this to 3 or even 2 with some more work.
1280
1281   - check for a valid recovery record on open of the tdb, while the
1282     global lock is held. Automatically recover from the transaction
1283     recovery area if needed, then continue with the open as
1284     usual. This allows for smooth crash recovery with no administrator
1285     intervention.
1286
1287   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1288     still available, but no transaction recovery area is used and no
1289     fsync/msync calls are made.
1290
1291 */
1292
1293 struct tdb_transaction_el {
1294         struct tdb_transaction_el *next, *prev;
1295         tdb_off_t offset;
1296         tdb_len_t length;
1297         unsigned char *data;
1298 };
1299
1300 /*
1301   hold the context of any current transaction
1302 */
1303 struct tdb_transaction {
1304         /* we keep a mirrored copy of the tdb hash heads here so
1305            tdb_next_hash_chain() can operate efficiently */
1306         u32 *hash_heads;
1307
1308         /* the original io methods - used to do IOs to the real db */
1309         const struct tdb_methods *io_methods;
1310
1311         /* the list of transaction elements. We use a doubly linked
1312            list with a last pointer to allow us to keep the list
1313            ordered, with first element at the front of the list. It
1314            needs to be doubly linked as the read/write traversals need
1315            to be backwards, while the commit needs to be forwards */
1316         struct tdb_transaction_el *elements, *elements_last;
1317
1318         /* non-zero when an internal transaction error has
1319            occurred. All write operations will then fail until the
1320            transaction is ended */
1321         int transaction_error;
1322
1323         /* when inside a transaction we need to keep track of any
1324            nested tdb_transaction_start() calls, as these are allowed,
1325            but don't create a new transaction */
1326         int nesting;
1327
1328         /* old file size before transaction */
1329         tdb_len_t old_map_size;
1330 };
1331
1332
1333 /*
1334   read while in a transaction. We need to check first if the data is in our list
1335   of transaction elements, then if not do a real read
1336 */
1337 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
1338                             tdb_len_t len, int cv)
1339 {
1340         struct tdb_transaction_el *el;
1341
1342         /* we need to walk the list backwards to get the most recent data */
1343         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1344                 tdb_len_t partial;
1345
1346                 if (off+len <= el->offset) {
1347                         continue;
1348                 }
1349                 if (off >= el->offset + el->length) {
1350                         continue;
1351                 }
1352
1353                 /* an overlapping read - needs to be split into up to
1354                    2 reads and a memcpy */
1355                 if (off < el->offset) {
1356                         partial = el->offset - off;
1357                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1358                                 goto fail;
1359                         }
1360                         len -= partial;
1361                         off += partial;
1362                         buf = (void *)(partial + (char *)buf);
1363                 }
1364                 if (off + len <= el->offset + el->length) {
1365                         partial = len;
1366                 } else {
1367                         partial = el->offset + el->length - off;
1368                 }
1369                 memcpy(buf, el->data + (off - el->offset), partial);
1370                 if (cv) {
1371                         tdb_convert(buf, len);
1372                 }
1373                 len -= partial;
1374                 off += partial;
1375                 buf = (void *)(partial + (char *)buf);
1376                 
1377                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1378                         goto fail;
1379                 }
1380
1381                 return 0;
1382         }
1383
1384         /* its not in the transaction elements - do a real read */
1385         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1386
1387 fail:
1388         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1389         tdb->ecode = TDB_ERR_IO;
1390         tdb->transaction->transaction_error = 1;
1391         return -1;
1392 }
1393
1394
1395 /*
1396   write while in a transaction
1397 */
1398 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
1399                              const void *buf, tdb_len_t len)
1400 {
1401         struct tdb_transaction_el *el, *best_el=NULL;
1402
1403         if (len == 0) {
1404                 return 0;
1405         }
1406         
1407         /* if the write is to a hash head, then update the transaction
1408            hash heads */
1409         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1410             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1411                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1412                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1413         }
1414
1415         /* first see if we can replace an existing entry */
1416         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1417                 tdb_len_t partial;
1418
1419                 if (best_el == NULL && off == el->offset+el->length) {
1420                         best_el = el;
1421                 }
1422
1423                 if (off+len <= el->offset) {
1424                         continue;
1425                 }
1426                 if (off >= el->offset + el->length) {
1427                         continue;
1428                 }
1429
1430                 /* an overlapping write - needs to be split into up to
1431                    2 writes and a memcpy */
1432                 if (off < el->offset) {
1433                         partial = el->offset - off;
1434                         if (transaction_write(tdb, off, buf, partial) != 0) {
1435                                 goto fail;
1436                         }
1437                         len -= partial;
1438                         off += partial;
1439                         buf = (const void *)(partial + (const char *)buf);
1440                 }
1441                 if (off + len <= el->offset + el->length) {
1442                         partial = len;
1443                 } else {
1444                         partial = el->offset + el->length - off;
1445                 }
1446                 memcpy(el->data + (off - el->offset), buf, partial);
1447                 len -= partial;
1448                 off += partial;
1449                 buf = (const void *)(partial + (const char *)buf);
1450                 
1451                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1452                         goto fail;
1453                 }
1454
1455                 return 0;
1456         }
1457
1458         /* see if we can append the new entry to an existing entry */
1459         if (best_el && best_el->offset + best_el->length == off && 
1460             (off+len < tdb->transaction->old_map_size ||
1461              off > tdb->transaction->old_map_size)) {
1462                 unsigned char *data = best_el->data;
1463                 el = best_el;
1464                 el->data = (unsigned char *)realloc(el->data,
1465                                                     el->length + len);
1466                 if (el->data == NULL) {
1467                         tdb->ecode = TDB_ERR_OOM;
1468                         tdb->transaction->transaction_error = 1;
1469                         el->data = data;
1470                         return -1;
1471                 }
1472                 if (buf) {
1473                         memcpy(el->data + el->length, buf, len);
1474                 } else {
1475                         memset(el->data + el->length, TDB_PAD_BYTE, len);
1476                 }
1477                 el->length += len;
1478                 return 0;
1479         }
1480
1481         /* add a new entry at the end of the list */
1482         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1483         if (el == NULL) {
1484                 tdb->ecode = TDB_ERR_OOM;
1485                 tdb->transaction->transaction_error = 1;                
1486                 return -1;
1487         }
1488         el->next = NULL;
1489         el->prev = tdb->transaction->elements_last;
1490         el->offset = off;
1491         el->length = len;
1492         el->data = (unsigned char *)malloc(len);
1493         if (el->data == NULL) {
1494                 free(el);
1495                 tdb->ecode = TDB_ERR_OOM;
1496                 tdb->transaction->transaction_error = 1;                
1497                 return -1;
1498         }
1499         if (buf) {
1500                 memcpy(el->data, buf, len);
1501         } else {
1502                 memset(el->data, TDB_PAD_BYTE, len);
1503         }
1504         if (el->prev) {
1505                 el->prev->next = el;
1506         } else {
1507                 tdb->transaction->elements = el;
1508         }
1509         tdb->transaction->elements_last = el;
1510         return 0;
1511
1512 fail:
1513         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1514         tdb->ecode = TDB_ERR_IO;
1515         tdb->transaction->transaction_error = 1;
1516         return -1;
1517 }
1518
1519 /*
1520   accelerated hash chain head search, using the cached hash heads
1521 */
1522 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1523 {
1524         u32 h = *chain;
1525         for (;h < tdb->header.hash_size;h++) {
1526                 /* the +1 takes account of the freelist */
1527                 if (0 != tdb->transaction->hash_heads[h+1]) {
1528                         break;
1529                 }
1530         }
1531         (*chain) = h;
1532 }
1533
1534 /*
1535   out of bounds check during a transaction
1536 */
1537 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1538 {
1539         if (len <= tdb->map_size) {
1540                 return 0;
1541         }
1542         return TDB_ERRCODE(TDB_ERR_IO, -1);
1543 }
1544
1545 /*
1546   transaction version of tdb_expand().
1547 */
1548 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
1549                                    tdb_off_t addition)
1550 {
1551         /* add a write to the transaction elements, so subsequent
1552            reads see the zero data */
1553         if (transaction_write(tdb, size, NULL, addition) != 0) {
1554                 return -1;
1555         }
1556
1557         return 0;
1558 }
1559
1560 /*
1561   brlock during a transaction - ignore them
1562 */
1563 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
1564                               int rw_type, int lck_type, int probe, size_t len)
1565 {
1566         return 0;
1567 }
1568
1569 static const struct tdb_methods transaction_methods = {
1570         transaction_read,
1571         transaction_write,
1572         transaction_next_hash_chain,
1573         transaction_oob,
1574         transaction_expand_file,
1575         transaction_brlock
1576 };
1577
1578
1579 /*
1580   start a tdb transaction. No token is returned, as only a single
1581   transaction is allowed to be pending per tdb_context
1582 */
1583 int tdb_transaction_start(struct tdb_context *tdb)
1584 {
1585         /* some sanity checks */
1586         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1587                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1588                 tdb->ecode = TDB_ERR_EINVAL;
1589                 return -1;
1590         }
1591
1592         /* cope with nested tdb_transaction_start() calls */
1593         if (tdb->transaction != NULL) {
1594                 tdb->transaction->nesting++;
1595                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
1596                          tdb->transaction->nesting));
1597                 return 0;
1598         }
1599
1600         if (tdb->num_locks != 0 || tdb->global_lock.count) {
1601                 /* the caller must not have any locks when starting a
1602                    transaction as otherwise we'll be screwed by lack
1603                    of nested locks in posix */
1604                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1605                 tdb->ecode = TDB_ERR_LOCK;
1606                 return -1;
1607         }
1608
1609         if (tdb->travlocks.next != NULL) {
1610                 /* you cannot use transactions inside a traverse (although you can use
1611                    traverse inside a transaction) as otherwise you can end up with
1612                    deadlock */
1613                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1614                 tdb->ecode = TDB_ERR_LOCK;
1615                 return -1;
1616         }
1617
1618         tdb->transaction = (struct tdb_transaction *)
1619                 calloc(sizeof(struct tdb_transaction), 1);
1620         if (tdb->transaction == NULL) {
1621                 tdb->ecode = TDB_ERR_OOM;
1622                 return -1;
1623         }
1624
1625         /* get the transaction write lock. This is a blocking lock. As
1626            discussed with Volker, there are a number of ways we could
1627            make this async, which we will probably do in the future */
1628         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1629                 SAFE_FREE(tdb->transaction);
1630                 return -1;
1631         }
1632         
1633         /* get a read lock from the freelist to the end of file. This
1634            is upgraded to a write lock during the commit */
1635         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1636                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1637                 tdb->ecode = TDB_ERR_LOCK;
1638                 goto fail;
1639         }
1640
1641         /* setup a copy of the hash table heads so the hash scan in
1642            traverse can be fast */
1643         tdb->transaction->hash_heads = (u32 *)
1644                 calloc(tdb->header.hash_size+1, sizeof(u32));
1645         if (tdb->transaction->hash_heads == NULL) {
1646                 tdb->ecode = TDB_ERR_OOM;
1647                 goto fail;
1648         }
1649         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1650                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1651                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1652                 tdb->ecode = TDB_ERR_IO;
1653                 goto fail;
1654         }
1655
1656         /* make sure we know about any file expansions already done by
1657            anyone else */
1658         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1659         tdb->transaction->old_map_size = tdb->map_size;
1660
1661         /* finally hook the io methods, replacing them with
1662            transaction specific methods */
1663         tdb->transaction->io_methods = tdb->methods;
1664         tdb->methods = &transaction_methods;
1665
1666         /* by calling this transaction write here, we ensure that we don't grow the
1667            transaction linked list due to hash table updates */
1668         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
1669                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
1670                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1671                 tdb->ecode = TDB_ERR_IO;
1672                 tdb->methods = tdb->transaction->io_methods;
1673                 goto fail;
1674         }
1675
1676         return 0;
1677         
1678 fail:
1679         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1680         tdb_transaction_unlock(tdb);
1681         SAFE_FREE(tdb->transaction->hash_heads);
1682         SAFE_FREE(tdb->transaction);
1683         return -1;
1684 }
1685
1686
1687 /*
1688   cancel the current transaction
1689 */
1690 int tdb_transaction_cancel(struct tdb_context *tdb)
1691 {       
1692         if (tdb->transaction == NULL) {
1693                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1694                 return -1;
1695         }
1696
1697         if (tdb->transaction->nesting != 0) {
1698                 tdb->transaction->transaction_error = 1;
1699                 tdb->transaction->nesting--;
1700                 return 0;
1701         }               
1702
1703         tdb->map_size = tdb->transaction->old_map_size;
1704
1705         /* free all the transaction elements */
1706         while (tdb->transaction->elements) {
1707                 struct tdb_transaction_el *el = tdb->transaction->elements;
1708                 tdb->transaction->elements = el->next;
1709                 free(el->data);
1710                 free(el);
1711         }
1712
1713         /* remove any global lock created during the transaction */
1714         if (tdb->global_lock.count != 0) {
1715                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1716                 tdb->global_lock.count = 0;
1717         }
1718
1719         /* remove any locks created during the transaction */
1720         if (tdb->num_locks != 0) {
1721                 int i;
1722                 for (i=0;i<tdb->num_lockrecs;i++) {
1723                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1724                                    F_UNLCK,F_SETLKW, 0, 1);
1725                 }
1726                 tdb->num_locks = 0;
1727                 tdb->num_lockrecs = 0;
1728                 SAFE_FREE(tdb->lockrecs);
1729         }
1730
1731         /* restore the normal io methods */
1732         tdb->methods = tdb->transaction->io_methods;
1733
1734         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1735         tdb_transaction_unlock(tdb);
1736         SAFE_FREE(tdb->transaction->hash_heads);
1737         SAFE_FREE(tdb->transaction);
1738         
1739         return 0;
1740 }
1741
1742 /*
1743   sync to disk
1744 */
1745 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1746 {       
1747         if (fsync(tdb->fd) != 0) {
1748                 tdb->ecode = TDB_ERR_IO;
1749                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1750                 return -1;
1751         }
1752 #ifdef MS_SYNC
1753         if (tdb->map_ptr) {
1754                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1755                 if (msync(moffset + (char *)tdb->map_ptr, 
1756                           length + (offset - moffset), MS_SYNC) != 0) {
1757                         tdb->ecode = TDB_ERR_IO;
1758                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1759                                  strerror(errno)));
1760                         return -1;
1761                 }
1762         }
1763 #endif
1764         return 0;
1765 }
1766
1767
1768 /*
1769   work out how much space the linearised recovery data will consume
1770 */
1771 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1772 {
1773         struct tdb_transaction_el *el;
1774         tdb_len_t recovery_size = 0;
1775
1776         recovery_size = sizeof(u32);
1777         for (el=tdb->transaction->elements;el;el=el->next) {
1778                 if (el->offset >= tdb->transaction->old_map_size) {
1779                         continue;
1780                 }
1781                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1782         }
1783
1784         return recovery_size;
1785 }
1786
1787 /*
1788   allocate the recovery area, or use an existing recovery area if it is
1789   large enough
1790 */
1791 static int tdb_recovery_allocate(struct tdb_context *tdb, 
1792                                  tdb_len_t *recovery_size,
1793                                  tdb_off_t *recovery_offset,
1794                                  tdb_len_t *recovery_max_size)
1795 {
1796         struct list_struct rec;
1797         const struct tdb_methods *methods = tdb->transaction->io_methods;
1798         tdb_off_t recovery_head;
1799
1800         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1801                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1802                 return -1;
1803         }
1804
1805         rec.rec_len = 0;
1806
1807         if (recovery_head != 0 && 
1808             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1809                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1810                 return -1;
1811         }
1812
1813         *recovery_size = tdb_recovery_size(tdb);
1814
1815         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1816                 /* it fits in the existing area */
1817                 *recovery_max_size = rec.rec_len;
1818                 *recovery_offset = recovery_head;
1819                 return 0;
1820         }
1821
1822         /* we need to free up the old recovery area, then allocate a
1823            new one at the end of the file. Note that we cannot use
1824            tdb_allocate() to allocate the new one as that might return
1825            us an area that is being currently used (as of the start of
1826            the transaction) */
1827         if (recovery_head != 0) {
1828                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1829                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1830                         return -1;
1831                 }
1832         }
1833
1834         /* the tdb_free() call might have increased the recovery size */
1835         *recovery_size = tdb_recovery_size(tdb);
1836
1837         /* round up to a multiple of page size */
1838         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1839         *recovery_offset = tdb->map_size;
1840         recovery_head = *recovery_offset;
1841
1842         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
1843                                      (tdb->map_size - tdb->transaction->old_map_size) +
1844                                      sizeof(rec) + *recovery_max_size) == -1) {
1845                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1846                 return -1;
1847         }
1848
1849         /* remap the file (if using mmap) */
1850         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1851
1852         /* we have to reset the old map size so that we don't try to expand the file
1853            again in the transaction commit, which would destroy the recovery area */
1854         tdb->transaction->old_map_size = tdb->map_size;
1855
1856         /* write the recovery header offset and sync - we can sync without a race here
1857            as the magic ptr in the recovery record has not been set */
1858         CONVERT(recovery_head);
1859         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
1860                                &recovery_head, sizeof(tdb_off_t)) == -1) {
1861                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1862                 return -1;
1863         }
1864
1865         return 0;
1866 }
1867
1868
1869 /*
1870   setup the recovery data that will be used on a crash during commit
1871 */
1872 static int transaction_setup_recovery(struct tdb_context *tdb, 
1873                                       tdb_off_t *magic_offset)
1874 {
1875         struct tdb_transaction_el *el;
1876         tdb_len_t recovery_size;
1877         unsigned char *data, *p;
1878         const struct tdb_methods *methods = tdb->transaction->io_methods;
1879         struct list_struct *rec;
1880         tdb_off_t recovery_offset, recovery_max_size;
1881         tdb_off_t old_map_size = tdb->transaction->old_map_size;
1882         u32 magic, tailer;
1883
1884         /*
1885           check that the recovery area has enough space
1886         */
1887         if (tdb_recovery_allocate(tdb, &recovery_size, 
1888                                   &recovery_offset, &recovery_max_size) == -1) {
1889                 return -1;
1890         }
1891
1892         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1893         if (data == NULL) {
1894                 tdb->ecode = TDB_ERR_OOM;
1895                 return -1;
1896         }
1897
1898         rec = (struct list_struct *)data;
1899         memset(rec, 0, sizeof(*rec));
1900
1901         rec->magic    = 0;
1902         rec->data_len = recovery_size;
1903         rec->rec_len  = recovery_max_size;
1904         rec->key_len  = old_map_size;
1905         CONVERT(rec);
1906
1907         /* build the recovery data into a single blob to allow us to do a single
1908            large write, which should be more efficient */
1909         p = data + sizeof(*rec);
1910         for (el=tdb->transaction->elements;el;el=el->next) {
1911                 if (el->offset >= old_map_size) {
1912                         continue;
1913                 }
1914                 if (el->offset + el->length > tdb->transaction->old_map_size) {
1915                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1916                         free(data);
1917                         tdb->ecode = TDB_ERR_CORRUPT;
1918                         return -1;
1919                 }
1920                 memcpy(p, &el->offset, 4);
1921                 memcpy(p+4, &el->length, 4);
1922                 if (DOCONV()) {
1923                         tdb_convert(p, 8);
1924                 }
1925                 /* the recovery area contains the old data, not the
1926                    new data, so we have to call the original tdb_read
1927                    method to get it */
1928                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1929                         free(data);
1930                         tdb->ecode = TDB_ERR_IO;
1931                         return -1;
1932                 }
1933                 p += 8 + el->length;
1934         }
1935
1936         /* and the tailer */
1937         tailer = sizeof(*rec) + recovery_max_size;
1938         memcpy(p, &tailer, 4);
1939         CONVERT(p);
1940
1941         /* write the recovery data to the recovery area */
1942         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1943                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1944                 free(data);
1945                 tdb->ecode = TDB_ERR_IO;
1946                 return -1;
1947         }
1948
1949         /* as we don't have ordered writes, we have to sync the recovery
1950            data before we update the magic to indicate that the recovery
1951            data is present */
1952         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1953                 free(data);
1954                 return -1;
1955         }
1956
1957         free(data);
1958
1959         magic = TDB_RECOVERY_MAGIC;
1960         CONVERT(magic);
1961
1962         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1963
1964         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1965                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1966                 tdb->ecode = TDB_ERR_IO;
1967                 return -1;
1968         }
1969
1970         /* ensure the recovery magic marker is on disk */
1971         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1972                 return -1;
1973         }
1974
1975         return 0;
1976 }
1977
1978 /*
1979   commit the current transaction
1980 */
1981 int tdb_transaction_commit(struct tdb_context *tdb)
1982 {       
1983         const struct tdb_methods *methods;
1984         tdb_off_t magic_offset = 0;
1985         u32 zero = 0;
1986
1987         if (tdb->transaction == NULL) {
1988                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1989                 return -1;
1990         }
1991
1992         if (tdb->transaction->transaction_error) {
1993                 tdb->ecode = TDB_ERR_IO;
1994                 tdb_transaction_cancel(tdb);
1995                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1996                 return -1;
1997         }
1998
1999         if (tdb->transaction->nesting != 0) {
2000                 tdb->transaction->nesting--;
2001                 return 0;
2002         }               
2003
2004         /* check for a null transaction */
2005         if (tdb->transaction->elements == NULL) {
2006                 tdb_transaction_cancel(tdb);
2007                 return 0;
2008         }
2009
2010         methods = tdb->transaction->io_methods;
2011         
2012         /* if there are any locks pending then the caller has not
2013            nested their locks properly, so fail the transaction */
2014         if (tdb->num_locks || tdb->global_lock.count) {
2015                 tdb->ecode = TDB_ERR_LOCK;
2016                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2017                 tdb_transaction_cancel(tdb);
2018                 return -1;
2019         }
2020
2021         /* upgrade the main transaction lock region to a write lock */
2022         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2023                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2024                 tdb->ecode = TDB_ERR_LOCK;
2025                 tdb_transaction_cancel(tdb);
2026                 return -1;
2027         }
2028
2029         /* get the global lock - this prevents new users attaching to the database
2030            during the commit */
2031         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2032                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2033                 tdb->ecode = TDB_ERR_LOCK;
2034                 tdb_transaction_cancel(tdb);
2035                 return -1;
2036         }
2037
2038         if (!(tdb->flags & TDB_NOSYNC)) {
2039                 /* write the recovery data to the end of the file */
2040                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2041                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2042                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2043                         tdb_transaction_cancel(tdb);
2044                         return -1;
2045                 }
2046         }
2047
2048         /* expand the file to the new size if needed */
2049         if (tdb->map_size != tdb->transaction->old_map_size) {
2050                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
2051                                              tdb->map_size - 
2052                                              tdb->transaction->old_map_size) == -1) {
2053                         tdb->ecode = TDB_ERR_IO;
2054                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2055                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2056                         tdb_transaction_cancel(tdb);
2057                         return -1;
2058                 }
2059                 tdb->map_size = tdb->transaction->old_map_size;
2060                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2061         }
2062
2063         /* perform all the writes */
2064         while (tdb->transaction->elements) {
2065                 struct tdb_transaction_el *el = tdb->transaction->elements;
2066
2067                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2068                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2069                         
2070                         /* we've overwritten part of the data and
2071                            possibly expanded the file, so we need to
2072                            run the crash recovery code */
2073                         tdb->methods = methods;
2074                         tdb_transaction_recover(tdb); 
2075
2076                         tdb_transaction_cancel(tdb);
2077                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2078
2079                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2080                         return -1;
2081                 }
2082                 tdb->transaction->elements = el->next;
2083                 free(el->data); 
2084                 free(el);
2085         } 
2086
2087         if (!(tdb->flags & TDB_NOSYNC)) {
2088                 /* ensure the new data is on disk */
2089                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2090                         return -1;
2091                 }
2092
2093                 /* remove the recovery marker */
2094                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2095                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2096                         return -1;
2097                 }
2098
2099                 /* ensure the recovery marker has been removed on disk */
2100                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
2101                         return -1;
2102                 }
2103         }
2104
2105         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2106
2107         /*
2108           TODO: maybe write to some dummy hdr field, or write to magic
2109           offset without mmap, before the last sync, instead of the
2110           utime() call
2111         */
2112
2113         /* on some systems (like Linux 2.6.x) changes via mmap/msync
2114            don't change the mtime of the file, this means the file may
2115            not be backed up (as tdb rounding to block sizes means that
2116            file size changes are quite rare too). The following forces
2117            mtime changes when a transaction completes */
2118 #ifdef HAVE_UTIME
2119         utime(tdb->name, NULL);
2120 #endif
2121
2122         /* use a transaction cancel to free memory and remove the
2123            transaction locks */
2124         tdb_transaction_cancel(tdb);
2125         return 0;
2126 }
2127
2128
2129 /*
2130   recover from an aborted transaction. Must be called with exclusive
2131   database write access already established (including the global
2132   lock to prevent new processes attaching)
2133 */
2134 int tdb_transaction_recover(struct tdb_context *tdb)
2135 {
2136         tdb_off_t recovery_head, recovery_eof;
2137         unsigned char *data, *p;
2138         u32 zero = 0;
2139         struct list_struct rec;
2140
2141         /* find the recovery area */
2142         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2143                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2144                 tdb->ecode = TDB_ERR_IO;
2145                 return -1;
2146         }
2147
2148         if (recovery_head == 0) {
2149                 /* we have never allocated a recovery record */
2150                 return 0;
2151         }
2152
2153         /* read the recovery record */
2154         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
2155                                    sizeof(rec), DOCONV()) == -1) {
2156                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
2157                 tdb->ecode = TDB_ERR_IO;
2158                 return -1;
2159         }
2160
2161         if (rec.magic != TDB_RECOVERY_MAGIC) {
2162                 /* there is no valid recovery data */
2163                 return 0;
2164         }
2165
2166         if (tdb->read_only) {
2167                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2168                 tdb->ecode = TDB_ERR_CORRUPT;
2169                 return -1;
2170         }
2171
2172         recovery_eof = rec.key_len;
2173
2174         data = (unsigned char *)malloc(rec.data_len);
2175         if (data == NULL) {
2176                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
2177                 tdb->ecode = TDB_ERR_OOM;
2178                 return -1;
2179         }
2180
2181         /* read the full recovery data */
2182         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2183                                    rec.data_len, 0) == -1) {
2184                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
2185                 tdb->ecode = TDB_ERR_IO;
2186                 return -1;
2187         }
2188
2189         /* recover the file data */
2190         p = data;
2191         while (p+8 < data + rec.data_len) {
2192                 u32 ofs, len;
2193                 if (DOCONV()) {
2194                         tdb_convert(p, 8);
2195                 }
2196                 memcpy(&ofs, p, 4);
2197                 memcpy(&len, p+4, 4);
2198
2199                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2200                         free(data);
2201                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2202                         tdb->ecode = TDB_ERR_IO;
2203                         return -1;
2204                 }
2205                 p += 8 + len;
2206         }
2207
2208         free(data);
2209
2210         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2211                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2212                 tdb->ecode = TDB_ERR_IO;
2213                 return -1;
2214         }
2215
2216         /* if the recovery area is after the recovered eof then remove it */
2217         if (recovery_eof <= recovery_head) {
2218                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2219                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2220                         tdb->ecode = TDB_ERR_IO;
2221                         return -1;                      
2222                 }
2223         }
2224
2225         /* remove the recovery magic */
2226         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
2227                           &zero) == -1) {
2228                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2229                 tdb->ecode = TDB_ERR_IO;
2230                 return -1;                      
2231         }
2232         
2233         /* reduce the file size to the old size */
2234         tdb_munmap(tdb);
2235         if (ftruncate(tdb->fd, recovery_eof) != 0) {
2236                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2237                 tdb->ecode = TDB_ERR_IO;
2238                 return -1;                      
2239         }
2240         tdb->map_size = recovery_eof;
2241         tdb_mmap(tdb);
2242
2243         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2244                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2245                 tdb->ecode = TDB_ERR_IO;
2246                 return -1;
2247         }
2248
2249         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
2250                  recovery_eof));
2251
2252         /* all done */
2253         return 0;
2254 }
2255
2256 /* file: freelist.c */
2257
2258 /* read a freelist record and check for simple errors */
2259 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2260 {
2261         if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2262                 return -1;
2263
2264         if (rec->magic == TDB_MAGIC) {
2265                 /* this happens when a app is showdown while deleting a record - we should
2266                    not completely fail when this happens */
2267                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
2268                          rec->magic, off));
2269                 rec->magic = TDB_FREE_MAGIC;
2270                 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2271                         return -1;
2272         }
2273
2274         if (rec->magic != TDB_FREE_MAGIC) {
2275                 /* Ensure ecode is set for log fn. */
2276                 tdb->ecode = TDB_ERR_CORRUPT;
2277                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n", 
2278                            rec->magic, off));
2279                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2280         }
2281         if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2282                 return -1;
2283         return 0;
2284 }
2285
2286
2287
2288 /* Remove an element from the freelist.  Must have alloc lock. */
2289 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2290 {
2291         tdb_off_t last_ptr, i;
2292
2293         /* read in the freelist top */
2294         last_ptr = FREELIST_TOP;
2295         while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2296                 if (i == off) {
2297                         /* We've found it! */
2298                         return tdb_ofs_write(tdb, last_ptr, &next);
2299                 }
2300                 /* Follow chain (next offset is at start of record) */
2301                 last_ptr = i;
2302         }
2303         TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2304         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2305 }
2306
2307
2308 /* update a record tailer (must hold allocation lock) */
2309 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2310                          const struct list_struct *rec)
2311 {
2312         tdb_off_t totalsize;
2313
2314         /* Offset of tailer from record header */
2315         totalsize = sizeof(*rec) + rec->rec_len;
2316         return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2317                          &totalsize);
2318 }
2319
2320 /* Add an element into the freelist. Merge adjacent records if
2321    neccessary. */
2322 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2323 {
2324         tdb_off_t right, left;
2325
2326         /* Allocation and tailer lock */
2327         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2328                 return -1;
2329
2330         /* set an initial tailer, so if we fail we don't leave a bogus record */
2331         if (update_tailer(tdb, offset, rec) != 0) {
2332                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2333                 goto fail;
2334         }
2335
2336         /* Look right first (I'm an Australian, dammit) */
2337         right = offset + sizeof(*rec) + rec->rec_len;
2338         if (right + sizeof(*rec) <= tdb->map_size) {
2339                 struct list_struct r;
2340
2341                 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2342                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2343                         goto left;
2344                 }
2345
2346                 /* If it's free, expand to include it. */
2347                 if (r.magic == TDB_FREE_MAGIC) {
2348                         if (remove_from_freelist(tdb, right, r.next) == -1) {
2349                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2350                                 goto left;
2351                         }
2352                         rec->rec_len += sizeof(r) + r.rec_len;
2353                 }
2354         }
2355
2356 left:
2357         /* Look left */
2358         left = offset - sizeof(tdb_off_t);
2359         if (left > TDB_DATA_START(tdb->header.hash_size)) {
2360                 struct list_struct l;
2361                 tdb_off_t leftsize;
2362                 
2363                 /* Read in tailer and jump back to header */
2364                 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2365                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2366                         goto update;
2367                 }
2368
2369                 /* it could be uninitialised data */
2370                 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2371                         goto update;
2372                 }
2373
2374                 left = offset - leftsize;
2375
2376                 /* Now read in record */
2377                 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2378                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2379                         goto update;
2380                 }
2381
2382                 /* If it's free, expand to include it. */
2383                 if (l.magic == TDB_FREE_MAGIC) {
2384                         if (remove_from_freelist(tdb, left, l.next) == -1) {
2385                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2386                                 goto update;
2387                         } else {
2388                                 offset = left;
2389                                 rec->rec_len += leftsize;
2390                         }
2391                 }
2392         }
2393
2394 update:
2395         if (update_tailer(tdb, offset, rec) == -1) {
2396                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2397                 goto fail;
2398         }
2399
2400         /* Now, prepend to free list */
2401         rec->magic = TDB_FREE_MAGIC;
2402
2403         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2404             tdb_rec_write(tdb, offset, rec) == -1 ||
2405             tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2406                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2407                 goto fail;
2408         }
2409
2410         /* And we're done. */
2411         tdb_unlock(tdb, -1, F_WRLCK);
2412         return 0;
2413
2414  fail:
2415         tdb_unlock(tdb, -1, F_WRLCK);
2416         return -1;
2417 }
2418
2419
2420 /* 
2421    the core of tdb_allocate - called when we have decided which
2422    free list entry to use
2423  */
2424 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2425                                 struct list_struct *rec, tdb_off_t last_ptr)
2426 {
2427         struct list_struct newrec;
2428         tdb_off_t newrec_ptr;
2429
2430         memset(&newrec, '\0', sizeof(newrec));
2431
2432         /* found it - now possibly split it up  */
2433         if (rec->rec_len > length + MIN_REC_SIZE) {
2434                 /* Length of left piece */
2435                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2436                 
2437                 /* Right piece to go on free list */
2438                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2439                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2440                 
2441                 /* And left record is shortened */
2442                 rec->rec_len = length;
2443         } else {
2444                 newrec_ptr = 0;
2445         }
2446         
2447         /* Remove allocated record from the free list */
2448         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2449                 return 0;
2450         }
2451         
2452         /* Update header: do this before we drop alloc
2453            lock, otherwise tdb_free() might try to
2454            merge with us, thinking we're free.
2455            (Thanks Jeremy Allison). */
2456         rec->magic = TDB_MAGIC;
2457         if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2458                 return 0;
2459         }
2460         
2461         /* Did we create new block? */
2462         if (newrec_ptr) {
2463                 /* Update allocated record tailer (we
2464                    shortened it). */
2465                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2466                         return 0;
2467                 }
2468                 
2469                 /* Free new record */
2470                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2471                         return 0;
2472                 }
2473         }
2474         
2475         /* all done - return the new record offset */
2476         return rec_ptr;
2477 }
2478
2479 /* allocate some space from the free list. The offset returned points
2480    to a unconnected list_struct within the database with room for at
2481    least length bytes of total data
2482
2483    0 is returned if the space could not be allocated
2484  */
2485 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2486 {
2487         tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2488         struct {
2489                 tdb_off_t rec_ptr, last_ptr;
2490                 tdb_len_t rec_len;
2491         } bestfit;
2492
2493         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2494                 return 0;
2495
2496         /* Extra bytes required for tailer */
2497         length += sizeof(tdb_off_t);
2498
2499  again:
2500         last_ptr = FREELIST_TOP;
2501
2502         /* read in the freelist top */
2503         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2504                 goto fail;
2505
2506         bestfit.rec_ptr = 0;
2507         bestfit.last_ptr = 0;
2508         bestfit.rec_len = 0;
2509
2510         /* 
2511            this is a best fit allocation strategy. Originally we used
2512            a first fit strategy, but it suffered from massive fragmentation
2513            issues when faced with a slowly increasing record size.
2514          */
2515         while (rec_ptr) {
2516                 if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2517                         goto fail;
2518                 }
2519
2520                 if (rec->rec_len >= length) {
2521                         if (bestfit.rec_ptr == 0 ||
2522                             rec->rec_len < bestfit.rec_len) {
2523                                 bestfit.rec_len = rec->rec_len;
2524                                 bestfit.rec_ptr = rec_ptr;
2525                                 bestfit.last_ptr = last_ptr;
2526                                 /* consider a fit to be good enough if
2527                                    we aren't wasting more than half
2528                                    the space */
2529                                 if (bestfit.rec_len < 2*length) {
2530                                         break;
2531                                 }
2532                         }
2533                 }
2534
2535                 /* move to the next record */
2536                 last_ptr = rec_ptr;
2537                 rec_ptr = rec->next;
2538         }
2539
2540         if (bestfit.rec_ptr != 0) {
2541                 if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2542                         goto fail;
2543                 }
2544
2545                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2546                 tdb_unlock(tdb, -1, F_WRLCK);
2547                 return newrec_ptr;
2548         }
2549
2550         /* we didn't find enough space. See if we can expand the
2551            database and if we can then try again */
2552         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2553                 goto again;
2554  fail:
2555         tdb_unlock(tdb, -1, F_WRLCK);
2556         return 0;
2557 }
2558
2559 /* file: freelistcheck.c */
2560
2561 /* Check the freelist is good and contains no loops.
2562    Very memory intensive - only do this as a consistency
2563    checker. Heh heh - uses an in memory tdb as the storage
2564    for the "seen" record list. For some reason this strikes
2565    me as extremely clever as I don't have to write another tree
2566    data structure implementation :-).
2567  */
2568
2569 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2570 {
2571         TDB_DATA key, data;
2572
2573         memset(&data, '\0', sizeof(data));
2574         key.dptr = (unsigned char *)&rec_ptr;
2575         key.dsize = sizeof(rec_ptr);
2576         return tdb_store(mem_tdb, key, data, TDB_INSERT);
2577 }
2578
2579 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2580 {
2581         struct tdb_context *mem_tdb = NULL;
2582         struct list_struct rec;
2583         tdb_off_t rec_ptr, last_ptr;
2584         int ret = -1;
2585
2586         *pnum_entries = 0;
2587
2588         mem_tdb = tdb_open("flval", tdb->header.hash_size,
2589                                 TDB_INTERNAL, O_RDWR, 0600);
2590         if (!mem_tdb) {
2591                 return -1;
2592         }
2593
2594         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2595                 tdb_close(mem_tdb);
2596                 return 0;
2597         }
2598
2599         last_ptr = FREELIST_TOP;
2600
2601         /* Store the FREELIST_TOP record. */
2602         if (seen_insert(mem_tdb, last_ptr) == -1) {
2603                 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2604                 goto fail;
2605         }
2606
2607         /* read in the freelist top */
2608         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2609                 goto fail;
2610         }
2611
2612         while (rec_ptr) {
2613
2614                 /* If we can't store this record (we've seen it
2615                    before) then the free list has a loop and must
2616                    be corrupt. */
2617
2618                 if (seen_insert(mem_tdb, rec_ptr)) {
2619                         ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2620                         goto fail;
2621                 }
2622
2623                 if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2624                         goto fail;
2625                 }
2626
2627                 /* move to the next record */
2628                 last_ptr = rec_ptr;
2629                 rec_ptr = rec.next;
2630                 *pnum_entries += 1;
2631         }
2632
2633         ret = 0;
2634
2635   fail:
2636
2637         tdb_close(mem_tdb);
2638         tdb_unlock(tdb, -1, F_WRLCK);
2639         return ret;
2640 }
2641
2642 /* file: traverse.c */
2643
2644 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2645 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2646                          struct list_struct *rec)
2647 {
2648         int want_next = (tlock->off != 0);
2649
2650         /* Lock each chain from the start one. */
2651         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2652                 if (!tlock->off && tlock->hash != 0) {
2653                         /* this is an optimisation for the common case where
2654                            the hash chain is empty, which is particularly
2655                            common for the use of tdb with ldb, where large
2656                            hashes are used. In that case we spend most of our
2657                            time in tdb_brlock(), locking empty hash chains.
2658                            
2659                            To avoid this, we do an unlocked pre-check to see
2660                            if the hash chain is empty before starting to look
2661                            inside it. If it is empty then we can avoid that
2662                            hash chain. If it isn't empty then we can't believe
2663                            the value we get back, as we read it without a
2664                            lock, so instead we get the lock and re-fetch the
2665                            value below.
2666                            
2667                            Notice that not doing this optimisation on the
2668                            first hash chain is critical. We must guarantee
2669                            that we have done at least one fcntl lock at the
2670                            start of a search to guarantee that memory is
2671                            coherent on SMP systems. If records are added by
2672                            others during the search then thats OK, and we
2673                            could possibly miss those with this trick, but we
2674                            could miss them anyway without this trick, so the
2675                            semantics don't change.
2676                            
2677                            With a non-indexed ldb search this trick gains us a
2678                            factor of around 80 in speed on a linux 2.6.x
2679                            system (testing using ldbtest).
2680                         */
2681                         tdb->methods->next_hash_chain(tdb, &tlock->hash);
2682                         if (tlock->hash == tdb->header.hash_size) {
2683                                 continue;
2684                         }
2685                 }
2686
2687                 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2688                         return -1;
2689
2690                 /* No previous record?  Start at top of chain. */
2691                 if (!tlock->off) {
2692                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2693                                      &tlock->off) == -1)
2694                                 goto fail;
2695                 } else {
2696                         /* Otherwise unlock the previous record. */
2697                         if (tdb_unlock_record(tdb, tlock->off) != 0)
2698                                 goto fail;
2699                 }
2700
2701                 if (want_next) {
2702                         /* We have offset of old record: grab next */
2703                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2704                                 goto fail;
2705                         tlock->off = rec->next;
2706                 }
2707
2708                 /* Iterate through chain */
2709                 while( tlock->off) {
2710                         tdb_off_t current;
2711                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2712                                 goto fail;
2713
2714                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2715                         if (tlock->off == rec->next) {
2716                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2717                                 goto fail;
2718                         }
2719
2720                         if (!TDB_DEAD(rec)) {
2721                                 /* Woohoo: we found one! */
2722                                 if (tdb_lock_record(tdb, tlock->off) != 0)
2723                                         goto fail;
2724                                 return tlock->off;
2725                         }
2726
2727                         /* Try to clean dead ones from old traverses */
2728                         current = tlock->off;
2729                         tlock->off = rec->next;
2730                         if (!(tdb->read_only || tdb->traverse_read) && 
2731                             tdb_do_delete(tdb, current, rec) != 0)
2732                                 goto fail;
2733                 }
2734                 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2735                 want_next = 0;
2736         }
2737         /* We finished iteration without finding anything */
2738         return TDB_ERRCODE(TDB_SUCCESS, 0);
2739
2740  fail:
2741         tlock->off = 0;
2742         if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2743                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2744         return -1;
2745 }
2746
2747 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2748    return -1 on error or the record count traversed
2749    if fn is NULL then it is not called
2750    a non-zero return value from fn() indicates that the traversal should stop
2751   */
2752 static int tdb_traverse_internal(struct tdb_context *tdb, 
2753                                  tdb_traverse_func fn, void *private_data,
2754                                  struct tdb_traverse_lock *tl)
2755 {
2756         TDB_DATA key, dbuf;
2757         struct list_struct rec;
2758         int ret, count = 0;
2759
2760         /* This was in the initializaton, above, but the IRIX compiler
2761          * did not like it.  crh
2762          */
2763         tl->next = tdb->travlocks.next;
2764
2765         /* fcntl locks don't stack: beware traverse inside traverse */
2766         tdb->travlocks.next = tl;
2767
2768         /* tdb_next_lock places locks on the record returned, and its chain */
2769         while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2770                 count++;
2771                 /* now read the full record */
2772                 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec), 
2773                                           rec.key_len + rec.data_len);
2774                 if (!key.dptr) {
2775                         ret = -1;
2776                         if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2777                                 goto out;
2778                         if (tdb_unlock_record(tdb, tl->off) != 0)
2779                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2780                         goto out;
2781                 }
2782                 key.dsize = rec.key_len;
2783                 dbuf.dptr = key.dptr + rec.key_len;
2784                 dbuf.dsize = rec.data_len;
2785
2786                 /* Drop chain lock, call out */
2787                 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2788                         ret = -1;
2789                         SAFE_FREE(key.dptr);
2790                         goto out;
2791                 }
2792                 if (fn && fn(tdb, key, dbuf, private_data)) {
2793                         /* They want us to terminate traversal */
2794                         ret = count;
2795                         if (tdb_unlock_record(tdb, tl->off) != 0) {
2796                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2797                                 ret = -1;
2798                         }
2799                         SAFE_FREE(key.dptr);
2800                         goto out;
2801                 }
2802                 SAFE_FREE(key.dptr);
2803         }
2804 out:
2805         tdb->travlocks.next = tl->next;
2806         if (ret < 0)
2807                 return -1;
2808         else
2809                 return count;
2810 }
2811
2812
2813 /*
2814   a write style traverse - temporarily marks the db read only
2815 */
2816 int tdb_traverse_read(struct tdb_context *tdb, 
2817                       tdb_traverse_func fn, void *private_data)
2818 {
2819         struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2820         int ret;
2821
2822         /* we need to get a read lock on the transaction lock here to
2823            cope with the lock ordering semantics of solaris10 */
2824         if (tdb_transaction_lock(tdb, F_RDLCK)) {
2825                 return -1;
2826         }
2827
2828         tdb->traverse_read++;
2829         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2830         tdb->traverse_read--;
2831
2832         tdb_transaction_unlock(tdb);
2833
2834         return ret;
2835 }
2836
2837 /*
2838   a write style traverse - needs to get the transaction lock to
2839   prevent deadlocks
2840 */
2841 int tdb_traverse(struct tdb_context *tdb, 
2842                  tdb_traverse_func fn, void *private_data)
2843 {
2844         struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2845         int ret;
2846
2847         if (tdb->read_only || tdb->traverse_read) {
2848                 return tdb_traverse_read(tdb, fn, private_data);
2849         }
2850         
2851         if (tdb_transaction_lock(tdb, F_WRLCK)) {
2852                 return -1;
2853         }
2854
2855         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2856
2857         tdb_transaction_unlock(tdb);
2858
2859         return ret;
2860 }
2861
2862
2863 /* find the first entry in the database and return its key */
2864 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2865 {
2866         TDB_DATA key;
2867         struct list_struct rec;
2868
2869         /* release any old lock */
2870         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2871                 return tdb_null;
2872         tdb->travlocks.off = tdb->travlocks.hash = 0;
2873         tdb->travlocks.lock_rw = F_RDLCK;
2874
2875         /* Grab first record: locks chain and returned record. */
2876         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2877                 return tdb_null;
2878         /* now read the key */
2879         key.dsize = rec.key_len;
2880         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2881
2882         /* Unlock the hash chain of the record we just read. */
2883         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2884                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2885         return key;
2886 }
2887
2888 /* find the next entry in the database, returning its key */
2889 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2890 {
2891         u32 oldhash;
2892         TDB_DATA key = tdb_null;
2893         struct list_struct rec;
2894         unsigned char *k = NULL;
2895
2896         /* Is locked key the old key?  If so, traverse will be reliable. */
2897         if (tdb->travlocks.off) {
2898                 if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2899                         return tdb_null;
2900                 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2901                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2902                                             rec.key_len))
2903                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2904                         /* No, it wasn't: unlock it and start from scratch */
2905                         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2906                                 SAFE_FREE(k);
2907                                 return tdb_null;
2908                         }
2909                         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2910                                 SAFE_FREE(k);
2911                                 return tdb_null;
2912                         }
2913                         tdb->travlocks.off = 0;
2914                 }
2915
2916                 SAFE_FREE(k);
2917         }
2918
2919         if (!tdb->travlocks.off) {
2920                 /* No previous element: do normal find, and lock record */
2921                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2922                 if (!tdb->travlocks.off)
2923                         return tdb_null;
2924                 tdb->travlocks.hash = BUCKET(rec.full_hash);
2925                 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2926                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2927                         return tdb_null;
2928                 }
2929         }
2930         oldhash = tdb->travlocks.hash;
2931
2932         /* Grab next record: locks chain and returned record,
2933            unlocks old record */
2934         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2935                 key.dsize = rec.key_len;
2936                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2937                                           key.dsize);
2938                 /* Unlock the chain of this new record */
2939                 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2940                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2941         }
2942         /* Unlock the chain of old record */
2943         if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2944                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2945         return key;
2946 }
2947
2948 /* file: dump.c */
2949
2950 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2951                                  tdb_off_t offset)
2952 {
2953         struct list_struct rec;
2954         tdb_off_t tailer_ofs, tailer;
2955
2956         if (tdb->methods->tdb_read(tdb, offset, (char *)&rec, 
2957                                    sizeof(rec), DOCONV()) == -1) {
2958                 printf("ERROR: failed to read record at %u\n", offset);
2959                 return 0;
2960         }
2961
2962         printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2963                "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2964                hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2965                rec.full_hash, rec.magic);
2966
2967         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2968
2969         if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2970                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2971                 return rec.next;
2972         }
2973
2974         if (tailer != rec.rec_len + sizeof(rec)) {
2975                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2976                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2977         }
2978         return rec.next;
2979 }
2980
2981 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2982 {
2983         tdb_off_t rec_ptr, top;
2984
2985         top = TDB_HASH_TOP(i);
2986
2987         if (tdb_lock(tdb, i, F_WRLCK) != 0)
2988                 return -1;
2989
2990         if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2991                 return tdb_unlock(tdb, i, F_WRLCK);
2992
2993         if (rec_ptr)
2994                 printf("hash=%d\n", i);
2995
2996         while (rec_ptr) {
2997                 rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
2998         }
2999
3000         return tdb_unlock(tdb, i, F_WRLCK);
3001 }
3002
3003 void tdb_dump_all(struct tdb_context *tdb)
3004 {
3005         int i;
3006         for (i=0;i<tdb->header.hash_size;i++) {
3007                 tdb_dump_chain(tdb, i);
3008         }
3009         printf("freelist:\n");
3010         tdb_dump_chain(tdb, -1);
3011 }
3012
3013 int tdb_printfreelist(struct tdb_context *tdb)
3014 {
3015         int ret;
3016         long total_free = 0;
3017         tdb_off_t offset, rec_ptr;
3018         struct list_struct rec;
3019
3020         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3021                 return ret;
3022
3023         offset = FREELIST_TOP;
3024
3025         /* read in the freelist top */
3026         if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3027                 tdb_unlock(tdb, -1, F_WRLCK);
3028                 return 0;
3029         }
3030
3031         printf("freelist top=[0x%08x]\n", rec_ptr );
3032         while (rec_ptr) {
3033                 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec, 
3034                                            sizeof(rec), DOCONV()) == -1) {
3035                         tdb_unlock(tdb, -1, F_WRLCK);
3036                         return -1;
3037                 }
3038
3039                 if (rec.magic != TDB_FREE_MAGIC) {
3040                         printf("bad magic 0x%08x in free list\n", rec.magic);
3041                         tdb_unlock(tdb, -1, F_WRLCK);
3042                         return -1;
3043                 }
3044
3045                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n", 
3046                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3047                 total_free += rec.rec_len;
3048
3049                 /* move to the next record */
3050                 rec_ptr = rec.next;
3051         }
3052         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
3053                (int)total_free);
3054
3055         return tdb_unlock(tdb, -1, F_WRLCK);
3056 }
3057
3058 /* file: tdb.c */
3059
3060 TDB_DATA tdb_null;
3061
3062 /*
3063   non-blocking increment of the tdb sequence number if the tdb has been opened using
3064   the TDB_SEQNUM flag
3065 */
3066 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3067 {
3068         tdb_off_t seqnum=0;
3069         
3070         if (!(tdb->flags & TDB_SEQNUM)) {
3071                 return;
3072         }
3073
3074         /* we ignore errors from this, as we have no sane way of
3075            dealing with them.
3076         */
3077         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3078         seqnum++;
3079         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3080 }
3081
3082 /*
3083   increment the tdb sequence number if the tdb has been opened using
3084   the TDB_SEQNUM flag
3085 */
3086 static void tdb_increment_seqnum(struct tdb_context *tdb)
3087 {
3088         if (!(tdb->flags & TDB_SEQNUM)) {
3089                 return;
3090         }
3091
3092         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3093                 return;
3094         }
3095
3096         tdb_increment_seqnum_nonblock(tdb);
3097
3098         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3099 }
3100
3101 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3102 {
3103         return memcmp(data.dptr, key.dptr, data.dsize);
3104 }
3105
3106 /* Returns 0 on fail.  On success, return offset of record, and fills
3107    in rec */
3108 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3109                         struct list_struct *r)
3110 {
3111         tdb_off_t rec_ptr;
3112         
3113         /* read in the hash top */
3114         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3115                 return 0;
3116
3117         /* keep looking until we find the right record */
3118         while (rec_ptr) {
3119                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3120                         return 0;
3121
3122                 if (!TDB_DEAD(r) && hash==r->full_hash
3123                     && key.dsize==r->key_len
3124                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3125                                       r->key_len, tdb_key_compare,
3126                                       NULL) == 0) {
3127                         return rec_ptr;
3128                 }
3129                 rec_ptr = r->next;
3130         }
3131         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3132 }
3133
3134 /* As tdb_find, but if you succeed, keep the lock */
3135 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3136                            struct list_struct *rec)
3137 {
3138         u32 rec_ptr;
3139
3140         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3141                 return 0;
3142         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3143                 tdb_unlock(tdb, BUCKET(hash), locktype);
3144         return rec_ptr;
3145 }
3146
3147
3148 /* update an entry in place - this only works if the new data size
3149    is <= the old data size and the key exists.
3150    on failure return -1.
3151 */
3152 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3153 {
3154         struct list_struct rec;
3155         tdb_off_t rec_ptr;
3156
3157         /* find entry */
3158         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3159                 return -1;
3160
3161         /* must be long enough key, data and tailer */
3162         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3163                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3164                 return -1;
3165         }
3166
3167         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3168                       dbuf.dptr, dbuf.dsize) == -1)
3169                 return -1;
3170
3171         if (dbuf.dsize != rec.data_len) {
3172                 /* update size */
3173                 rec.data_len = dbuf.dsize;
3174                 return tdb_rec_write(tdb, rec_ptr, &rec);
3175         }
3176  
3177         return 0;
3178 }
3179
3180 /* find an entry in the database given a key */
3181 /* If an entry doesn't exist tdb_err will be set to
3182  * TDB_ERR_NOEXIST. If a key has no data attached
3183  * then the TDB_DATA will have zero length but
3184  * a non-zero pointer
3185  */
3186 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3187 {
3188         tdb_off_t rec_ptr;
3189         struct list_struct rec;
3190         TDB_DATA ret;
3191         u32 hash;
3192
3193         /* find which hash bucket it is in */
3194         hash = tdb->hash_fn(&key);
3195         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3196                 return tdb_null;
3197
3198         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3199                                   rec.data_len);
3200         ret.dsize = rec.data_len;
3201         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3202         return ret;
3203 }
3204
3205 /*
3206  * Find an entry in the database and hand the record's data to a parsing
3207  * function. The parsing function is executed under the chain read lock, so it
3208  * should be fast and should not block on other syscalls.
3209  *
3210  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3211  *
3212  * For mmapped tdb's that do not have a transaction open it points the parsing
3213  * function directly at the mmap area, it avoids the malloc/memcpy in this
3214  * case. If a transaction is open or no mmap is available, it has to do
3215  * malloc/read/parse/free.
3216  *
3217  * This is interesting for all readers of potentially large data structures in
3218  * the tdb records, ldb indexes being one example.
3219  */
3220
3221 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3222                      int (*parser)(TDB_DATA key, TDB_DATA data,
3223                                    void *private_data),
3224                      void *private_data)
3225 {
3226         tdb_off_t rec_ptr;
3227         struct list_struct rec;
3228         int ret;
3229         u32 hash;
3230
3231         /* find which hash bucket it is in */
3232         hash = tdb->hash_fn(&key);
3233
3234         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3235                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3236         }
3237
3238         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3239                              rec.data_len, parser, private_data);
3240
3241         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3242
3243         return ret;
3244 }
3245
3246 /* check if an entry in the database exists 
3247
3248    note that 1 is returned if the key is found and 0 is returned if not found
3249    this doesn't match the conventions in the rest of this module, but is
3250    compatible with gdbm
3251 */
3252 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3253 {
3254         struct list_struct rec;
3255         
3256         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3257                 return 0;
3258         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3259         return 1;
3260 }
3261
3262 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3263 {
3264         u32 hash = tdb->hash_fn(&key);
3265         return tdb_exists_hash(tdb, key, hash);
3266 }
3267
3268 /* actually delete an entry in the database given the offset */
3269 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3270 {
3271         tdb_off_t last_ptr, i;
3272         struct list_struct lastrec;
3273
3274         if (tdb->read_only || tdb->traverse_read) return -1;
3275
3276         if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3277                 /* Someone traversing here: mark it as dead */
3278                 rec->magic = TDB_DEAD_MAGIC;
3279                 return tdb_rec_write(tdb, rec_ptr, rec);
3280         }
3281         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3282                 return -1;
3283
3284         /* find previous record in hash chain */
3285         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3286                 return -1;
3287         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3288                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3289                         return -1;
3290
3291         /* unlink it: next ptr is at start of record. */
3292         if (last_ptr == 0)
3293                 last_ptr = TDB_HASH_TOP(rec->full_hash);
3294         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3295                 return -1;
3296
3297         /* recover the space */
3298         if (tdb_free(tdb, rec_ptr, rec) == -1)
3299                 return -1;
3300         return 0;
3301 }
3302
3303 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3304 {
3305         int res = 0;
3306         tdb_off_t rec_ptr;
3307         struct list_struct rec;
3308         
3309         /* read in the hash top */
3310         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3311                 return 0;
3312
3313         while (rec_ptr) {
3314                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3315                         return 0;
3316
3317                 if (rec.magic == TDB_DEAD_MAGIC) {
3318                         res += 1;
3319                 }
3320                 rec_ptr = rec.next;
3321         }
3322         return res;
3323 }
3324
3325 /*
3326  * Purge all DEAD records from a hash chain
3327  */
3328 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3329 {
3330         int res = -1;
3331         struct list_struct rec;
3332         tdb_off_t rec_ptr;
3333
3334         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3335                 return -1;
3336         }
3337         
3338         /* read in the hash top */
3339         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3340                 goto fail;
3341
3342         while (rec_ptr) {
3343                 tdb_off_t next;
3344
3345                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3346                         goto fail;
3347                 }
3348
3349                 next = rec.next;
3350
3351                 if (rec.magic == TDB_DEAD_MAGIC
3352                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3353                         goto fail;
3354                 }
3355                 rec_ptr = next;
3356         }
3357         res = 0;
3358  fail:
3359         tdb_unlock(tdb, -1, F_WRLCK);
3360         return res;
3361 }
3362
3363 /* delete an entry in the database given a key */
3364 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3365 {
3366         tdb_off_t rec_ptr;
3367         struct list_struct rec;
3368         int ret;
3369
3370         if (tdb->max_dead_records != 0) {
3371
3372                 /*
3373                  * Allow for some dead records per hash chain, mainly for
3374                  * tdb's with a very high create/delete rate like locking.tdb.
3375                  */
3376
3377                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3378                         return -1;
3379
3380                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3381                         /*
3382                          * Don't let the per-chain freelist grow too large,
3383                          * delete all existing dead records
3384                          */
3385                         tdb_purge_dead(tdb, hash);
3386                 }
3387
3388                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3389                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3390                         return -1;
3391                 }
3392
3393                 /*
3394                  * Just mark the record as dead.
3395                  */
3396                 rec.magic = TDB_DEAD_MAGIC;
3397                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3398         }
3399         else {
3400                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3401                                                    &rec)))
3402                         return -1;
3403
3404                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3405         }
3406
3407         if (ret == 0) {
3408                 tdb_increment_seqnum(tdb);
3409         }
3410
3411         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3412                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3413         return ret;
3414 }
3415
3416 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3417 {
3418         u32 hash = tdb->hash_fn(&key);
3419         return tdb_delete_hash(tdb, key, hash);
3420 }
3421
3422 /*
3423  * See if we have a dead record around with enough space
3424  */
3425 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3426                                struct list_struct *r, tdb_len_t length)
3427 {
3428         tdb_off_t rec_ptr;
3429         
3430         /* read in the hash top */
3431         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3432                 return 0;
3433
3434         /* keep looking until we find the right record */
3435         while (rec_ptr) {
3436                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3437                         return 0;
3438
3439                 if (TDB_DEAD(r) && r->rec_len >= length) {
3440                         /*
3441                          * First fit for simple coding, TODO: change to best
3442                          * fit
3443                          */
3444                         return rec_ptr;
3445                 }
3446                 rec_ptr = r->next;
3447         }
3448         return 0;
3449 }
3450
3451 /* store an element in the database, replacing any existing element
3452    with the same key 
3453
3454    return 0 on success, -1 on failure
3455 */
3456 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3457 {
3458         struct list_struct rec;
3459         u32 hash;
3460         tdb_off_t rec_ptr;
3461         char *p = NULL;
3462         int ret = -1;
3463
3464         if (tdb->read_only || tdb->traverse_read) {
3465                 tdb->ecode = TDB_ERR_RDONLY;
3466                 return -1;
3467         }
3468
3469         /* find which hash bucket it is in */
3470         hash = tdb->hash_fn(&key);
3471         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3472                 return -1;
3473
3474         /* check for it existing, on insert. */
3475         if (flag == TDB_INSERT) {
3476                 if (tdb_exists_hash(tdb, key, hash)) {
3477                         tdb->ecode = TDB_ERR_EXISTS;
3478                         goto fail;
3479                 }
3480         } else {
3481                 /* first try in-place update, on modify or replace. */
3482                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3483                         goto done;
3484                 }
3485                 if (tdb->ecode == TDB_ERR_NOEXIST &&
3486                     flag == TDB_MODIFY) {
3487                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
3488                          we should fail the store */
3489                         goto fail;
3490                 }
3491         }
3492         /* reset the error code potentially set by the tdb_update() */
3493         tdb->ecode = TDB_SUCCESS;
3494
3495         /* delete any existing record - if it doesn't exist we don't
3496            care.  Doing this first reduces fragmentation, and avoids
3497            coalescing with `allocated' block before it's updated. */
3498         if (flag != TDB_INSERT)
3499                 tdb_delete_hash(tdb, key, hash);
3500
3501         /* Copy key+value *before* allocating free space in case malloc
3502            fails and we are left with a dead spot in the tdb. */
3503
3504         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3505                 tdb->ecode = TDB_ERR_OOM;
3506                 goto fail;
3507         }
3508
3509         memcpy(p, key.dptr, key.dsize);
3510         if (dbuf.dsize)
3511                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3512
3513         if (tdb->max_dead_records != 0) {
3514                 /*
3515                  * Allow for some dead records per hash chain, look if we can
3516                  * find one that can hold the new record. We need enough space
3517                  * for key, data and tailer. If we find one, we don't have to
3518                  * consult the central freelist.
3519                  */
3520                 rec_ptr = tdb_find_dead(
3521                         tdb, hash, &rec,
3522                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3523
3524                 if (rec_ptr != 0) {
3525                         rec.key_len = key.dsize;
3526                         rec.data_len = dbuf.dsize;
3527                         rec.full_hash = hash;
3528                         rec.magic = TDB_MAGIC;
3529                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3530                             || tdb->methods->tdb_write(
3531                                     tdb, rec_ptr + sizeof(rec),
3532                                     p, key.dsize + dbuf.dsize) == -1) {
3533                                 goto fail;
3534                         }
3535                         goto done;
3536                 }
3537         }
3538
3539         /*
3540          * We have to allocate some space from the freelist, so this means we
3541          * have to lock it. Use the chance to purge all the DEAD records from
3542          * the hash chain under the freelist lock.
3543          */
3544
3545         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3546                 goto fail;
3547         }
3548
3549         if ((tdb->max_dead_records != 0)
3550             && (tdb_purge_dead(tdb, hash) == -1)) {
3551                 tdb_unlock(tdb, -1, F_WRLCK);
3552                 goto fail;
3553         }
3554
3555         /* we have to allocate some space */
3556         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3557
3558         tdb_unlock(tdb, -1, F_WRLCK);
3559
3560         if (rec_ptr == 0) {
3561                 goto fail;
3562         }
3563
3564         /* Read hash top into next ptr */
3565         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3566                 goto fail;
3567
3568         rec.key_len = key.dsize;
3569         rec.data_len = dbuf.dsize;
3570         rec.full_hash = hash;
3571         rec.magic = TDB_MAGIC;
3572
3573         /* write out and point the top of the hash chain at it */
3574         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3575             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3576             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3577                 /* Need to tdb_unallocate() here */
3578                 goto fail;
3579         }
3580
3581  done:
3582         ret = 0;
3583  fail:
3584         if (ret == 0) {
3585                 tdb_increment_seqnum(tdb);
3586         }
3587
3588         SAFE_FREE(p); 
3589         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3590         return ret;
3591 }
3592
3593
3594 /* Append to an entry. Create if not exist. */
3595 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3596 {
3597         u32 hash;
3598         TDB_DATA dbuf;
3599         int ret = -1;
3600
3601         /* find which hash bucket it is in */
3602         hash = tdb->hash_fn(&key);
3603         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3604                 return -1;
3605
3606         dbuf = tdb_fetch(tdb, key);
3607
3608         if (dbuf.dptr == NULL) {
3609                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3610         } else {
3611                 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3612                                                      dbuf.dsize + new_dbuf.dsize);
3613                 if (new_dptr == NULL) {
3614                         free(dbuf.dptr);
3615                 }
3616                 dbuf.dptr = new_dptr;
3617         }
3618
3619         if (dbuf.dptr == NULL) {
3620                 tdb->ecode = TDB_ERR_OOM;
3621                 goto failed;
3622         }
3623
3624         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3625         dbuf.dsize += new_dbuf.dsize;
3626
3627         ret = tdb_store(tdb, key, dbuf, 0);
3628         
3629 failed:
3630         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3631         SAFE_FREE(dbuf.dptr);
3632         return ret;
3633 }
3634
3635
3636 /*
3637   return the name of the current tdb file
3638   useful for external logging functions
3639 */
3640 const char *tdb_name(struct tdb_context *tdb)
3641 {
3642         return tdb->name;
3643 }
3644
3645 /*
3646   return the underlying file descriptor being used by tdb, or -1
3647   useful for external routines that want to check the device/inode
3648   of the fd
3649 */
3650 int tdb_fd(struct tdb_context *tdb)
3651 {
3652         return tdb->fd;
3653 }
3654
3655 /*
3656   return the current logging function
3657   useful for external tdb routines that wish to log tdb errors
3658 */
3659 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3660 {
3661         return tdb->log.log_fn;
3662 }
3663
3664
3665 /*
3666   get the tdb sequence number. Only makes sense if the writers opened
3667   with TDB_SEQNUM set. Note that this sequence number will wrap quite
3668   quickly, so it should only be used for a 'has something changed'
3669   test, not for code that relies on the count of the number of changes
3670   made. If you want a counter then use a tdb record.
3671
3672   The aim of this sequence number is to allow for a very lightweight
3673   test of a possible tdb change.
3674 */
3675 int tdb_get_seqnum(struct tdb_context *tdb)
3676 {
3677         tdb_off_t seqnum=0;
3678
3679         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3680         return seqnum;
3681 }
3682
3683 int tdb_hash_size(struct tdb_context *tdb)
3684 {
3685         return tdb->header.hash_size;
3686 }
3687
3688 size_t tdb_map_size(struct tdb_context *tdb)
3689 {
3690         return tdb->map_size;
3691 }
3692
3693 int tdb_get_flags(struct tdb_context *tdb)
3694 {
3695         return tdb->flags;
3696 }
3697
3698
3699 /*
3700   enable sequence number handling on an open tdb
3701 */
3702 void tdb_enable_seqnum(struct tdb_context *tdb)
3703 {
3704         tdb->flags |= TDB_SEQNUM;
3705 }
3706
3707 /* file: open.c */
3708
3709 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3710 static struct tdb_context *tdbs = NULL;
3711
3712
3713 /* This is based on the hash algorithm from gdbm */
3714 static unsigned int default_tdb_hash(TDB_DATA *key)
3715 {
3716         u32 value;      /* Used to compute the hash value.  */
3717         u32   i;        /* Used to cycle through random values. */
3718
3719         /* Set the initial value from the key size. */
3720         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
3721                 value = (value + (key->dptr[i] << (i*5 % 24)));
3722
3723         return (1103515243 * value + 12345);  
3724 }
3725
3726
3727 /* initialise a new database with a specified hash size */
3728 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3729 {
3730         struct tdb_header *newdb;
3731         int size, ret = -1;
3732
3733         /* We make it up in memory, then write it out if not internal */
3734         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3735         if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3736                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3737
3738         /* Fill in the header */
3739         newdb->version = TDB_VERSION;
3740         newdb->hash_size = hash_size;
3741         if (tdb->flags & TDB_INTERNAL) {
3742                 tdb->map_size = size;
3743                 tdb->map_ptr = (char *)newdb;
3744                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3745                 /* Convert the `ondisk' version if asked. */
3746                 CONVERT(*newdb);
3747                 return 0;
3748         }
3749         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3750                 goto fail;
3751
3752         if (ftruncate(tdb->fd, 0) == -1)
3753                 goto fail;
3754
3755         /* This creates an endian-converted header, as if read from disk */
3756         CONVERT(*newdb);
3757         memcpy(&tdb->header, newdb, sizeof(tdb->header));
3758         /* Don't endian-convert the magic food! */
3759         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3760         if (write(tdb->fd, newdb, size) != size) {
3761                 ret = -1;
3762         } else {
3763                 ret = 0;
3764         }
3765
3766   fail:
3767         SAFE_FREE(newdb);
3768         return ret;
3769 }
3770
3771
3772
3773 static int tdb_already_open(dev_t device,
3774                             ino_t ino)
3775 {
3776         struct tdb_context *i;
3777         
3778         for (i = tdbs; i; i = i->next) {
3779                 if (i->device == device && i->inode == ino) {
3780                         return 1;
3781                 }
3782         }
3783
3784         return 0;
3785 }
3786
3787 /* open the database, creating it if necessary 
3788
3789    The open_flags and mode are passed straight to the open call on the
3790    database file. A flags value of O_WRONLY is invalid. The hash size
3791    is advisory, use zero for a default value.
3792
3793    Return is NULL on error, in which case errno is also set.  Don't 
3794    try to call tdb_error or tdb_errname, just do strerror(errno).
3795
3796    @param name may be NULL for internal databases. */
3797 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3798                       int open_flags, mode_t mode)
3799 {
3800         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3801 }
3802
3803 /* a default logging function */
3804 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3805 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3806 {
3807 }
3808
3809
3810 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3811                                 int open_flags, mode_t mode,
3812                                 const struct tdb_logging_context *log_ctx,
3813                                 tdb_hash_func hash_fn)
3814 {
3815         struct tdb_context *tdb;
3816         struct stat st;
3817         int rev = 0, locked = 0;
3818         unsigned char *vp;
3819         u32 vertest;
3820
3821         if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3822                 /* Can't log this */
3823                 errno = ENOMEM;
3824                 goto fail;
3825         }
3826         tdb_io_init(tdb);
3827         tdb->fd = -1;
3828         tdb->name = NULL;
3829         tdb->map_ptr = NULL;
3830         tdb->flags = tdb_flags;
3831         tdb->open_flags = open_flags;
3832         if (log_ctx) {
3833                 tdb->log = *log_ctx;
3834         } else {
3835                 tdb->log.log_fn = null_log_fn;
3836                 tdb->log.log_private = NULL;
3837         }
3838         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3839
3840         /* cache the page size */
3841         tdb->page_size = getpagesize();
3842         if (tdb->page_size <= 0) {
3843                 tdb->page_size = 0x2000;
3844         }
3845
3846         if ((open_flags & O_ACCMODE) == O_WRONLY) {
3847                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3848                          name));
3849                 errno = EINVAL;
3850                 goto fail;
3851         }
3852         
3853         if (hash_size == 0)
3854                 hash_size = DEFAULT_HASH_SIZE;
3855         if ((open_flags & O_ACCMODE) == O_RDONLY) {
3856                 tdb->read_only = 1;
3857                 /* read only databases don't do locking or clear if first */
3858                 tdb->flags |= TDB_NOLOCK;
3859                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3860         }
3861
3862         /* internal databases don't mmap or lock, and start off cleared */
3863         if (tdb->flags & TDB_INTERNAL) {
3864                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3865                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3866                 if (tdb_new_database(tdb, hash_size) != 0) {
3867                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3868                         goto fail;
3869                 }
3870                 goto internal;
3871         }
3872
3873         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3874                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3875                          name, strerror(errno)));
3876                 goto fail;      /* errno set by open(2) */
3877         }
3878
3879         /* ensure there is only one process initialising at once */
3880         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3881                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3882                          name, strerror(errno)));
3883                 goto fail;      /* errno set by tdb_brlock */
3884         }
3885
3886         /* we need to zero database if we are the only one with it open */
3887         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3888             (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3889                 open_flags |= O_CREAT;
3890                 if (ftruncate(tdb->fd, 0) == -1) {
3891                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3892                                  "failed to truncate %s: %s\n",
3893                                  name, strerror(errno)));
3894                         goto fail; /* errno set by ftruncate */
3895                 }
3896         }
3897
3898         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3899             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3900             || (tdb->header.version != TDB_VERSION
3901                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3902                 /* its not a valid database - possibly initialise it */
3903                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3904                         errno = EIO; /* ie bad format or something */
3905                         goto fail;
3906                 }
3907                 rev = (tdb->flags & TDB_CONVERT);
3908         }
3909         vp = (unsigned char *)&tdb->header.version;
3910         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3911                   (((u32)vp[2]) << 8) | (u32)vp[3];
3912         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3913         if (!rev)
3914                 tdb->flags &= ~TDB_CONVERT;
3915         else {
3916                 tdb->flags |= TDB_CONVERT;
3917                 tdb_convert(&tdb->header, sizeof(tdb->header));
3918         }
3919         if (fstat(tdb->fd, &st) == -1)
3920                 goto fail;
3921
3922         if (tdb->header.rwlocks != 0) {
3923                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3924                 goto fail;
3925         }
3926
3927         /* Is it already in the open list?  If so, fail. */
3928         if (tdb_already_open(st.st_dev, st.st_ino)) {
3929                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3930                          "%s (%d,%d) is already open in this process\n",
3931                          name, (int)st.st_dev, (int)st.st_ino));
3932                 errno = EBUSY;
3933                 goto fail;
3934         }
3935
3936         if (!(tdb->name = (char *)strdup(name))) {
3937                 errno = ENOMEM;
3938                 goto fail;
3939         }
3940
3941         tdb->map_size = st.st_size;
3942         tdb->device = st.st_dev;
3943         tdb->inode = st.st_ino;
3944         tdb->max_dead_records = 0;
3945         tdb_mmap(tdb);
3946         if (locked) {
3947                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3948                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3949                                  "failed to take ACTIVE_LOCK on %s: %s\n",
3950                                  name, strerror(errno)));
3951                         goto fail;
3952                 }
3953
3954         }
3955
3956         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3957            we didn't get the initial exclusive lock as we need to let all other
3958            users know we're using it. */
3959
3960         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3961                 /* leave this lock in place to indicate it's in use */
3962                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3963                         goto fail;
3964         }
3965
3966         /* if needed, run recovery */
3967         if (tdb_transaction_recover(tdb) == -1) {
3968                 goto fail;
3969         }
3970
3971  internal:
3972         /* Internal (memory-only) databases skip all the code above to
3973          * do with disk files, and resume here by releasing their
3974          * global lock and hooking into the active list. */
3975         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3976                 goto fail;
3977         tdb->next = tdbs;
3978         tdbs = tdb;
3979         return tdb;
3980
3981  fail:
3982         { int save_errno = errno;
3983
3984         if (!tdb)
3985                 return NULL;
3986         
3987         if (tdb->map_ptr) {
3988                 if (tdb->flags & TDB_INTERNAL)
3989                         SAFE_FREE(tdb->map_ptr);
3990                 else
3991                         tdb_munmap(tdb);
3992         }
3993         SAFE_FREE(tdb->name);
3994         if (tdb->fd != -1)
3995                 if (close(tdb->fd) != 0)
3996                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3997         SAFE_FREE(tdb);
3998         errno = save_errno;
3999         return NULL;
4000         }
4001 }
4002
4003 /*
4004  * Set the maximum number of dead records per hash chain
4005  */
4006
4007 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4008 {
4009         tdb->max_dead_records = max_dead;
4010 }
4011
4012 /**
4013  * Close a database.
4014  *
4015  * @returns -1 for error; 0 for success.
4016  **/
4017 int tdb_close(struct tdb_context *tdb)
4018 {
4019         struct tdb_context **i;
4020         int ret = 0;
4021
4022         if (tdb->transaction) {
4023                 tdb_transaction_cancel(tdb);
4024         }
4025
4026         if (tdb->map_ptr) {
4027                 if (tdb->flags & TDB_INTERNAL)
4028                         SAFE_FREE(tdb->map_ptr);
4029                 else
4030                         tdb_munmap(tdb);
4031         }
4032         SAFE_FREE(tdb->name);
4033         if (tdb->fd != -1)
4034                 ret = close(tdb->fd);
4035         SAFE_FREE(tdb->lockrecs);
4036
4037         /* Remove from contexts list */
4038         for (i = &tdbs; *i; i = &(*i)->next) {
4039                 if (*i == tdb) {
4040                         *i = tdb->next;
4041                         break;
4042                 }
4043         }
4044
4045         memset(tdb, 0, sizeof(*tdb));
4046         SAFE_FREE(tdb);
4047
4048         return ret;
4049 }
4050
4051 /* register a loging function */
4052 void tdb_set_logging_function(struct tdb_context *tdb,
4053                               const struct tdb_logging_context *log_ctx)
4054 {
4055         tdb->log = *log_ctx;
4056 }
4057
4058 void *tdb_get_logging_private(struct tdb_context *tdb)
4059 {
4060         return tdb->log.log_private;
4061 }
4062
4063 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
4064    seek pointer from our parent and to re-establish locks */
4065 int tdb_reopen(struct tdb_context *tdb)
4066 {
4067         struct stat st;
4068
4069         if (tdb->flags & TDB_INTERNAL) {
4070                 return 0; /* Nothing to do. */
4071         }
4072
4073         if (tdb->num_locks != 0 || tdb->global_lock.count) {
4074                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4075                 goto fail;
4076         }
4077
4078         if (tdb->transaction != 0) {
4079                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4080                 goto fail;
4081         }
4082
4083         if (tdb_munmap(tdb) != 0) {
4084                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4085                 goto fail;
4086         }
4087         if (close(tdb->fd) != 0)
4088                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4089         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4090         if (tdb->fd == -1) {
4091                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4092                 goto fail;
4093         }
4094         if ((tdb->flags & TDB_CLEAR_IF_FIRST) && 
4095             (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4096                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4097                 goto fail;
4098         }
4099         if (fstat(tdb->fd, &st) != 0) {
4100                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4101                 goto fail;
4102         }
4103         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4104                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4105                 goto fail;
4106         }
4107         tdb_mmap(tdb);
4108
4109         return 0;
4110
4111 fail:
4112         tdb_close(tdb);
4113         return -1;
4114 }
4115
4116 /* reopen all tdb's */
4117 int tdb_reopen_all(int parent_longlived)
4118 {
4119         struct tdb_context *tdb;
4120
4121         for (tdb=tdbs; tdb; tdb = tdb->next) {
4122                 /*
4123                  * If the parent is longlived (ie. a
4124                  * parent daemon architecture), we know
4125                  * it will keep it's active lock on a
4126                  * tdb opened with CLEAR_IF_FIRST. Thus
4127                  * for child processes we don't have to
4128                  * add an active lock. This is essential
4129                  * to improve performance on systems that
4130                  * keep POSIX locks as a non-scalable data
4131                  * structure in the kernel.
4132                  */
4133                 if (parent_longlived) {
4134                         /* Ensure no clear-if-first. */
4135                         tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4136                 }
4137
4138                 if (tdb_reopen(tdb) != 0)
4139                         return -1;
4140         }
4141
4142         return 0;
4143 }