Whamcloud - gitweb
libext2fs: fix tdb.c mmap leak
[tools/e2fsprogs.git] / lib / ext2fs / tdb.c
1 /*
2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3 Rev: 23590
4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5 */
6  /*
7    trivial database library - standalone version
8
9    Copyright (C) Andrew Tridgell              1999-2005
10    Copyright (C) Jeremy Allison               2000-2006
11    Copyright (C) Paul `Rusty' Russell         2000
12
13      ** NOTE! The following LGPL license applies to the tdb
14      ** library. This does NOT imply that all of Samba is released
15      ** under the LGPL
16
17    This library is free software; you can redistribute it and/or
18    modify it under the terms of the GNU Lesser General Public
19    License as published by the Free Software Foundation; either
20    version 2 of the License, or (at your option) any later version.
21
22    This library is distributed in the hope that it will be useful,
23    but WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25    Lesser General Public License for more details.
26
27    You should have received a copy of the GNU Lesser General Public
28    License along with this library; if not, write to the Free Software
29    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30 */
31
32 #ifdef CONFIG_STAND_ALONE
33 #define HAVE_MMAP
34 #define HAVE_STRDUP
35 #define HAVE_SYS_MMAN_H
36 #define HAVE_UTIME_H
37 #define HAVE_UTIME
38 #endif
39 #define _XOPEN_SOURCE 600
40
41 #include "config.h"
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <stdarg.h>
46 #include <stddef.h>
47 #include <errno.h>
48 #include <string.h>
49 #ifdef HAVE_SYS_SELECT_H
50 #include <sys/select.h>
51 #endif
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <time.h>
55 #ifdef HAVE_UTIME_H
56 #include <utime.h>
57 #endif
58 #include <sys/stat.h>
59 #include <sys/file.h>
60 #include <fcntl.h>
61
62 #ifdef HAVE_SYS_MMAN_H
63 #include <sys/mman.h>
64 #endif
65
66 #ifndef MAP_FILE
67 #define MAP_FILE 0
68 #endif
69
70 #ifndef MAP_FAILED
71 #define MAP_FAILED ((void *)-1)
72 #endif
73
74 #ifndef HAVE_STRDUP
75 #define strdup rep_strdup
76 static char *rep_strdup(const char *s)
77 {
78         char *ret;
79         int length;
80         if (!s)
81                 return NULL;
82
83         if (!length)
84                 length = strlen(s);
85
86         ret = malloc(length + 1);
87         if (ret) {
88                 strncpy(ret, s, length);
89                 ret[length] = '\0';
90         }
91         return ret;
92 }
93 #endif
94
95 #ifndef PRINTF_ATTRIBUTE
96 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
97 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
98  * the parameter containing the format, and a2 the index of the first
99  * argument. Note that some gcc 2.x versions don't handle this
100  * properly **/
101 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
102 #else
103 #define PRINTF_ATTRIBUTE(a1, a2)
104 #endif
105 #endif
106
107 typedef int bool;
108
109 #include "tdb.h"
110
111 static TDB_DATA tdb_null;
112
113 #ifndef u32
114 #define u32 unsigned
115 #endif
116
117 typedef u32 tdb_len_t;
118 typedef u32 tdb_off_t;
119
120 #ifndef offsetof
121 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
122 #endif
123
124 #define TDB_MAGIC_FOOD "TDB file\n"
125 #define TDB_VERSION (0x26011967 + 6)
126 #define TDB_MAGIC (0x26011999U)
127 #define TDB_FREE_MAGIC (~TDB_MAGIC)
128 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
129 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
130 #define TDB_ALIGNMENT 4
131 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
132 #define DEFAULT_HASH_SIZE 131
133 #define FREELIST_TOP (sizeof(struct tdb_header))
134 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
135 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
136 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
137 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
138 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
139 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
140 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
141 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
142 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
143 #define TDB_PAD_BYTE 0x42
144 #define TDB_PAD_U32  0x42424242
145
146 /* NB assumes there is a local variable called "tdb" that is the
147  * current context, also takes doubly-parenthesized print-style
148  * argument. */
149 #define TDB_LOG(x) tdb->log.log_fn x
150
151 /* lock offsets */
152 #define GLOBAL_LOCK      0
153 #define ACTIVE_LOCK      4
154 #define TRANSACTION_LOCK 8
155
156 /* free memory if the pointer is valid and zero the pointer */
157 #ifndef SAFE_FREE
158 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
159 #endif
160
161 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
162
163 #define DOCONV() (tdb->flags & TDB_CONVERT)
164 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
165
166
167 /* the body of the database is made of one list_struct for the free space
168    plus a separate data list for each hash value */
169 struct list_struct {
170         tdb_off_t next; /* offset of the next record in the list */
171         tdb_len_t rec_len; /* total byte length of record */
172         tdb_len_t key_len; /* byte length of key */
173         tdb_len_t data_len; /* byte length of data */
174         u32 full_hash; /* the full 32 bit hash of the key */
175         u32 magic;   /* try to catch errors */
176         /* the following union is implied:
177                 union {
178                         char record[rec_len];
179                         struct {
180                                 char key[key_len];
181                                 char data[data_len];
182                         }
183                         u32 totalsize; (tailer)
184                 }
185         */
186 };
187
188
189 /* this is stored at the front of every database */
190 struct tdb_header {
191         char magic_food[32]; /* for /etc/magic */
192         u32 version; /* version of the code */
193         u32 hash_size; /* number of hash entries */
194         tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
195         tdb_off_t recovery_start; /* offset of transaction recovery region */
196         tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
197         tdb_off_t reserved[29];
198 };
199
200 struct tdb_lock_type {
201         int list;
202         u32 count;
203         u32 ltype;
204 };
205
206 struct tdb_traverse_lock {
207         struct tdb_traverse_lock *next;
208         u32 off;
209         u32 hash;
210         int lock_rw;
211 };
212
213
214 struct tdb_methods {
215         int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
216         int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
217         void (*next_hash_chain)(struct tdb_context *, u32 *);
218         int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
219         int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
220         int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
221 };
222
223 struct tdb_context {
224         char *name; /* the name of the database */
225         void *map_ptr; /* where it is currently mapped */
226         int fd; /* open file descriptor for the database */
227         tdb_len_t map_size; /* how much space has been mapped */
228         int read_only; /* opened read-only */
229         int traverse_read; /* read-only traversal */
230         struct tdb_lock_type global_lock;
231         int num_lockrecs;
232         struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
233         enum TDB_ERROR ecode; /* error code for last tdb error */
234         struct tdb_header header; /* a cached copy of the header */
235         u32 flags; /* the flags passed to tdb_open */
236         struct tdb_traverse_lock travlocks; /* current traversal locks */
237         struct tdb_context *next; /* all tdbs to avoid multiple opens */
238         dev_t device;   /* uniquely identifies this tdb */
239         ino_t inode;    /* uniquely identifies this tdb */
240         struct tdb_logging_context log;
241         unsigned int (*hash_fn)(TDB_DATA *key);
242         int open_flags; /* flags used in the open - needed by reopen */
243         unsigned int num_locks; /* number of chain locks held */
244         const struct tdb_methods *methods;
245         struct tdb_transaction *transaction;
246         int page_size;
247         int max_dead_records;
248         bool have_transaction_lock;
249         tdb_len_t real_map_size; /* how much space has been mapped */
250 };
251
252
253 /*
254   internal prototypes
255 */
256 static int tdb_munmap(struct tdb_context *tdb);
257 static void tdb_mmap(struct tdb_context *tdb);
258 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
259 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
260 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
261 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
262 static int tdb_transaction_unlock(struct tdb_context *tdb);
263 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
264 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
265 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
266 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
268 static void *tdb_convert(void *buf, u32 size);
269 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
270 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
271 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
272 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
273 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
274 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
275 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
276 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
277 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
278 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
279 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
280                    tdb_off_t offset, tdb_len_t len,
281                    int (*parser)(TDB_DATA key, TDB_DATA data,
282                                  void *private_data),
283                    void *private_data);
284 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
285                            struct list_struct *rec);
286 static void tdb_io_init(struct tdb_context *tdb);
287 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
288 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
289                       struct list_struct *rec);
290
291
292 /* file: error.c */
293
294 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
295 {
296         return tdb->ecode;
297 }
298
299 static struct tdb_errname {
300         enum TDB_ERROR ecode; const char *estring;
301 } emap[] = { {TDB_SUCCESS, "Success"},
302              {TDB_ERR_CORRUPT, "Corrupt database"},
303              {TDB_ERR_IO, "IO Error"},
304              {TDB_ERR_LOCK, "Locking error"},
305              {TDB_ERR_OOM, "Out of memory"},
306              {TDB_ERR_EXISTS, "Record exists"},
307              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
308              {TDB_ERR_EINVAL, "Invalid parameter"},
309              {TDB_ERR_NOEXIST, "Record does not exist"},
310              {TDB_ERR_RDONLY, "write not permitted"} };
311
312 /* Error string for the last tdb error */
313 const char *tdb_errorstr(struct tdb_context *tdb)
314 {
315         u32 i;
316         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
317                 if (tdb->ecode == emap[i].ecode)
318                         return emap[i].estring;
319         return "Invalid error code";
320 }
321
322 /* file: lock.c */
323
324 #define TDB_MARK_LOCK 0x80000000
325
326 /* a byte range locking function - return 0 on success
327    this functions locks/unlocks 1 byte at the specified offset.
328
329    On error, errno is also set so that errors are passed back properly
330    through tdb_open().
331
332    note that a len of zero means lock to end of file
333 */
334 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
335                int rw_type, int lck_type, int probe, size_t len)
336 {
337         struct flock fl;
338         int ret;
339
340         if (tdb->flags & TDB_NOLOCK) {
341                 return 0;
342         }
343
344         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
345                 tdb->ecode = TDB_ERR_RDONLY;
346                 return -1;
347         }
348
349         fl.l_type = rw_type;
350         fl.l_whence = SEEK_SET;
351         fl.l_start = offset;
352         fl.l_len = len;
353         fl.l_pid = 0;
354
355         do {
356                 ret = fcntl(tdb->fd,lck_type,&fl);
357         } while (ret == -1 && errno == EINTR);
358
359         if (ret == -1) {
360                 /* Generic lock error. errno set by fcntl.
361                  * EAGAIN is an expected return from non-blocking
362                  * locks. */
363                 if (!probe && lck_type != F_SETLK) {
364                         /* Ensure error code is set for log fun to examine. */
365                         tdb->ecode = TDB_ERR_LOCK;
366                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
367                                  tdb->fd, offset, rw_type, lck_type, (int)len));
368                 }
369                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
370         }
371         return 0;
372 }
373
374
375 /*
376   upgrade a read lock to a write lock. This needs to be handled in a
377   special way as some OSes (such as solaris) have too conservative
378   deadlock detection and claim a deadlock when progress can be
379   made. For those OSes we may loop for a while.
380 */
381 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
382 {
383         int count = 1000;
384         while (count--) {
385                 struct timeval tv;
386                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
387                         return 0;
388                 }
389                 if (errno != EDEADLK) {
390                         break;
391                 }
392                 /* sleep for as short a time as we can - more portable than usleep() */
393                 tv.tv_sec = 0;
394                 tv.tv_usec = 1;
395                 select(0, NULL, NULL, NULL, &tv);
396         }
397         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
398         return -1;
399 }
400
401
402 /* lock a list in the database. list -1 is the alloc list */
403 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
404 {
405         struct tdb_lock_type *new_lck;
406         int i;
407         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
408
409         ltype &= ~TDB_MARK_LOCK;
410
411         /* a global lock allows us to avoid per chain locks */
412         if (tdb->global_lock.count &&
413             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
414                 return 0;
415         }
416
417         if (tdb->global_lock.count) {
418                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
419         }
420
421         if (list < -1 || list >= (int)tdb->header.hash_size) {
422                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
423                            list, ltype));
424                 return -1;
425         }
426         if (tdb->flags & TDB_NOLOCK)
427                 return 0;
428
429         for (i=0; i<tdb->num_lockrecs; i++) {
430                 if (tdb->lockrecs[i].list == list) {
431                         if (tdb->lockrecs[i].count == 0) {
432                                 /*
433                                  * Can't happen, see tdb_unlock(). It should
434                                  * be an assert.
435                                  */
436                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
437                                          "lck->count == 0 for list %d", list));
438                         }
439                         /*
440                          * Just increment the in-memory struct, posix locks
441                          * don't stack.
442                          */
443                         tdb->lockrecs[i].count++;
444                         return 0;
445                 }
446         }
447
448         new_lck = (struct tdb_lock_type *)realloc(
449                 tdb->lockrecs,
450                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
451         if (new_lck == NULL) {
452                 errno = ENOMEM;
453                 return -1;
454         }
455         tdb->lockrecs = new_lck;
456
457         /* Since fcntl locks don't nest, we do a lock for the first one,
458            and simply bump the count for future ones */
459         if (!mark_lock &&
460             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
461                                      0, 1)) {
462                 return -1;
463         }
464
465         tdb->num_locks++;
466
467         tdb->lockrecs[tdb->num_lockrecs].list = list;
468         tdb->lockrecs[tdb->num_lockrecs].count = 1;
469         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
470         tdb->num_lockrecs += 1;
471
472         return 0;
473 }
474
475 /* lock a list in the database. list -1 is the alloc list */
476 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
477 {
478         int ret;
479         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
480         if (ret) {
481                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
482                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
483         }
484         return ret;
485 }
486
487 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
488 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
489 {
490         return _tdb_lock(tdb, list, ltype, F_SETLK);
491 }
492
493
494 /* unlock the database: returns void because it's too late for errors. */
495         /* changed to return int it may be interesting to know there
496            has been an error  --simo */
497 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
498 {
499         int ret = -1;
500         int i;
501         struct tdb_lock_type *lck = NULL;
502         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
503
504         ltype &= ~TDB_MARK_LOCK;
505
506         /* a global lock allows us to avoid per chain locks */
507         if (tdb->global_lock.count &&
508             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
509                 return 0;
510         }
511
512         if (tdb->global_lock.count) {
513                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
514         }
515
516         if (tdb->flags & TDB_NOLOCK)
517                 return 0;
518
519         /* Sanity checks */
520         if (list < -1 || list >= (int)tdb->header.hash_size) {
521                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
522                 return ret;
523         }
524
525         for (i=0; i<tdb->num_lockrecs; i++) {
526                 if (tdb->lockrecs[i].list == list) {
527                         lck = &tdb->lockrecs[i];
528                         break;
529                 }
530         }
531
532         if ((lck == NULL) || (lck->count == 0)) {
533                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
534                 return -1;
535         }
536
537         if (lck->count > 1) {
538                 lck->count--;
539                 return 0;
540         }
541
542         /*
543          * This lock has count==1 left, so we need to unlock it in the
544          * kernel. We don't bother with decrementing the in-memory array
545          * element, we're about to overwrite it with the last array element
546          * anyway.
547          */
548
549         if (mark_lock) {
550                 ret = 0;
551         } else {
552                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
553                                                F_SETLKW, 0, 1);
554         }
555         tdb->num_locks--;
556
557         /*
558          * Shrink the array by overwriting the element just unlocked with the
559          * last array element.
560          */
561
562         if (tdb->num_lockrecs > 1) {
563                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
564         }
565         tdb->num_lockrecs -= 1;
566
567         /*
568          * We don't bother with realloc when the array shrinks, but if we have
569          * a completely idle tdb we should get rid of the locked array.
570          */
571
572         if (tdb->num_lockrecs == 0) {
573                 SAFE_FREE(tdb->lockrecs);
574         }
575
576         if (ret)
577                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
578         return ret;
579 }
580
581 /*
582   get the transaction lock
583  */
584 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
585 {
586         if (tdb->have_transaction_lock || tdb->global_lock.count) {
587                 return 0;
588         }
589         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
590                                      F_SETLKW, 0, 1) == -1) {
591                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
592                 tdb->ecode = TDB_ERR_LOCK;
593                 return -1;
594         }
595         tdb->have_transaction_lock = 1;
596         return 0;
597 }
598
599 /*
600   release the transaction lock
601  */
602 int tdb_transaction_unlock(struct tdb_context *tdb)
603 {
604         int ret;
605         if (!tdb->have_transaction_lock) {
606                 return 0;
607         }
608         ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
609         if (ret == 0) {
610                 tdb->have_transaction_lock = 0;
611         }
612         return ret;
613 }
614
615
616
617
618 /* lock/unlock entire database */
619 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
620 {
621         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
622
623         ltype &= ~TDB_MARK_LOCK;
624
625         /* There are no locks on read-only dbs */
626         if (tdb->read_only || tdb->traverse_read)
627                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
628
629         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
630                 tdb->global_lock.count++;
631                 return 0;
632         }
633
634         if (tdb->global_lock.count) {
635                 /* a global lock of a different type exists */
636                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
637         }
638
639         if (tdb->num_locks != 0) {
640                 /* can't combine global and chain locks */
641                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
642         }
643
644         if (!mark_lock &&
645             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
646                                      0, 4*tdb->header.hash_size)) {
647                 if (op == F_SETLKW) {
648                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
649                 }
650                 return -1;
651         }
652
653         tdb->global_lock.count = 1;
654         tdb->global_lock.ltype = ltype;
655
656         return 0;
657 }
658
659
660
661 /* unlock entire db */
662 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
663 {
664         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
665
666         ltype &= ~TDB_MARK_LOCK;
667
668         /* There are no locks on read-only dbs */
669         if (tdb->read_only || tdb->traverse_read) {
670                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
671         }
672
673         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
674                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
675         }
676
677         if (tdb->global_lock.count > 1) {
678                 tdb->global_lock.count--;
679                 return 0;
680         }
681
682         if (!mark_lock &&
683             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
684                                      0, 4*tdb->header.hash_size)) {
685                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
686                 return -1;
687         }
688
689         tdb->global_lock.count = 0;
690         tdb->global_lock.ltype = 0;
691
692         return 0;
693 }
694
695 /* lock entire database with write lock */
696 int tdb_lockall(struct tdb_context *tdb)
697 {
698         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
699 }
700
701 /* lock entire database with write lock - mark only */
702 int tdb_lockall_mark(struct tdb_context *tdb)
703 {
704         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
705 }
706
707 /* unlock entire database with write lock - unmark only */
708 int tdb_lockall_unmark(struct tdb_context *tdb)
709 {
710         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
711 }
712
713 /* lock entire database with write lock - nonblocking varient */
714 int tdb_lockall_nonblock(struct tdb_context *tdb)
715 {
716         return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
717 }
718
719 /* unlock entire database with write lock */
720 int tdb_unlockall(struct tdb_context *tdb)
721 {
722         return _tdb_unlockall(tdb, F_WRLCK);
723 }
724
725 /* lock entire database with read lock */
726 int tdb_lockall_read(struct tdb_context *tdb)
727 {
728         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
729 }
730
731 /* lock entire database with read lock - nonblock varient */
732 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
733 {
734         return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
735 }
736
737 /* unlock entire database with read lock */
738 int tdb_unlockall_read(struct tdb_context *tdb)
739 {
740         return _tdb_unlockall(tdb, F_RDLCK);
741 }
742
743 /* lock/unlock one hash chain. This is meant to be used to reduce
744    contention - it cannot guarantee how many records will be locked */
745 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
746 {
747         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
748 }
749
750 /* lock/unlock one hash chain, non-blocking. This is meant to be used
751    to reduce contention - it cannot guarantee how many records will be
752    locked */
753 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
754 {
755         return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
756 }
757
758 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
759 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
760 {
761         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
762 }
763
764 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
765 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
766 {
767         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
768 }
769
770 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
771 {
772         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
773 }
774
775 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
776 {
777         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
778 }
779
780 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
781 {
782         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
783 }
784
785
786
787 /* record lock stops delete underneath */
788 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
789 {
790         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
791 }
792
793 /*
794   Write locks override our own fcntl readlocks, so check it here.
795   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
796   an error to fail to get the lock here.
797 */
798 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
799 {
800         struct tdb_traverse_lock *i;
801         for (i = &tdb->travlocks; i; i = i->next)
802                 if (i->off == off)
803                         return -1;
804         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
805 }
806
807 /*
808   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
809   an error to fail to get the lock here.
810 */
811 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
812 {
813         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
814 }
815
816 /* fcntl locks don't stack: avoid unlocking someone else's */
817 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
818 {
819         struct tdb_traverse_lock *i;
820         u32 count = 0;
821
822         if (off == 0)
823                 return 0;
824         for (i = &tdb->travlocks; i; i = i->next)
825                 if (i->off == off)
826                         count++;
827         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
828 }
829
830 /* file: io.c */
831
832 /* check for an out of bounds access - if it is out of bounds then
833    see if the database has been expanded by someone else and expand
834    if necessary
835    note that "len" is the minimum length needed for the db
836 */
837 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
838 {
839         struct stat st;
840         if (len <= tdb->map_size)
841                 return 0;
842         if (tdb->flags & TDB_INTERNAL) {
843                 if (!probe) {
844                         /* Ensure ecode is set for log fn. */
845                         tdb->ecode = TDB_ERR_IO;
846                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
847                                  (int)len, (int)tdb->map_size));
848                 }
849                 return TDB_ERRCODE(TDB_ERR_IO, -1);
850         }
851
852         if (fstat(tdb->fd, &st) == -1) {
853                 return TDB_ERRCODE(TDB_ERR_IO, -1);
854         }
855
856         if (st.st_size < (size_t)len) {
857                 if (!probe) {
858                         /* Ensure ecode is set for log fn. */
859                         tdb->ecode = TDB_ERR_IO;
860                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
861                                  (int)len, (int)st.st_size));
862                 }
863                 return TDB_ERRCODE(TDB_ERR_IO, -1);
864         }
865
866         /* Unmap, update size, remap */
867         if (tdb_munmap(tdb) == -1)
868                 return TDB_ERRCODE(TDB_ERR_IO, -1);
869         tdb->map_size = st.st_size;
870         tdb_mmap(tdb);
871         return 0;
872 }
873
874 /* write a lump of data at a specified offset */
875 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
876                      const void *buf, tdb_len_t len)
877 {
878         if (len == 0) {
879                 return 0;
880         }
881
882         if (tdb->read_only || tdb->traverse_read) {
883                 tdb->ecode = TDB_ERR_RDONLY;
884                 return -1;
885         }
886
887         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
888                 return -1;
889
890         if (tdb->map_ptr) {
891                 memcpy(off + (char *)tdb->map_ptr, buf, len);
892         } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
893                 /* Ensure ecode is set for log fn. */
894                 tdb->ecode = TDB_ERR_IO;
895                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
896                            off, len, strerror(errno)));
897                 return TDB_ERRCODE(TDB_ERR_IO, -1);
898         }
899         return 0;
900 }
901
902 /* Endian conversion: we only ever deal with 4 byte quantities */
903 void *tdb_convert(void *buf, u32 size)
904 {
905         u32 i, *p = (u32 *)buf;
906         for (i = 0; i < size / 4; i++)
907                 p[i] = TDB_BYTEREV(p[i]);
908         return buf;
909 }
910
911
912 /* read a lump of data at a specified offset, maybe convert */
913 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
914                     tdb_len_t len, int cv)
915 {
916         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
917                 return -1;
918         }
919
920         if (tdb->map_ptr) {
921                 memcpy(buf, off + (char *)tdb->map_ptr, len);
922         } else {
923                 ssize_t ret = pread(tdb->fd, buf, len, off);
924                 if (ret != (ssize_t)len) {
925                         /* Ensure ecode is set for log fn. */
926                         tdb->ecode = TDB_ERR_IO;
927                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
928                                  "len=%d ret=%d (%s) map_size=%d\n",
929                                  (int)off, (int)len, (int)ret, strerror(errno),
930                                  (int)tdb->map_size));
931                         return TDB_ERRCODE(TDB_ERR_IO, -1);
932                 }
933         }
934         if (cv) {
935                 tdb_convert(buf, len);
936         }
937         return 0;
938 }
939
940
941
942 /*
943   do an unlocked scan of the hash table heads to find the next non-zero head. The value
944   will then be confirmed with the lock held
945 */
946 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
947 {
948         u32 h = *chain;
949         if (tdb->map_ptr) {
950                 for (;h < tdb->header.hash_size;h++) {
951                         if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
952                                 break;
953                         }
954                 }
955         } else {
956                 u32 off=0;
957                 for (;h < tdb->header.hash_size;h++) {
958                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
959                                 break;
960                         }
961                 }
962         }
963         (*chain) = h;
964 }
965
966
967 int tdb_munmap(struct tdb_context *tdb)
968 {
969         if (tdb->flags & TDB_INTERNAL)
970                 return 0;
971
972 #ifdef HAVE_MMAP
973         if (tdb->map_ptr) {
974                 int ret = munmap(tdb->map_ptr, tdb->real_map_size);
975                 if (ret != 0)
976                         return ret;
977                 tdb->real_map_size = 0;
978         }
979 #endif
980         tdb->map_ptr = NULL;
981         return 0;
982 }
983
984 void tdb_mmap(struct tdb_context *tdb)
985 {
986         if (tdb->flags & TDB_INTERNAL)
987                 return;
988
989 #ifdef HAVE_MMAP
990         if (!(tdb->flags & TDB_NOMMAP)) {
991                 tdb->map_ptr = mmap(NULL, tdb->map_size,
992                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE),
993                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
994
995                 /*
996                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
997                  */
998
999                 if (tdb->map_ptr == MAP_FAILED) {
1000                         tdb->real_map_size = 0;
1001                         tdb->map_ptr = NULL;
1002                         TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1003                                  tdb->map_size, strerror(errno)));
1004                 }
1005                 tdb->real_map_size = tdb->map_size;
1006         } else {
1007                 tdb->map_ptr = NULL;
1008         }
1009 #else
1010         tdb->map_ptr = NULL;
1011 #endif
1012 }
1013
1014 /* expand a file.  we prefer to use ftruncate, as that is what posix
1015   says to use for mmap expansion */
1016 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1017 {
1018         char buf[1024];
1019
1020         if (tdb->read_only || tdb->traverse_read) {
1021                 tdb->ecode = TDB_ERR_RDONLY;
1022                 return -1;
1023         }
1024
1025         if (ftruncate(tdb->fd, size+addition) == -1) {
1026                 char b = 0;
1027                 if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1028                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1029                                  size+addition, strerror(errno)));
1030                         return -1;
1031                 }
1032         }
1033
1034         /* now fill the file with something. This ensures that the
1035            file isn't sparse, which would be very bad if we ran out of
1036            disk. This must be done with write, not via mmap */
1037         memset(buf, TDB_PAD_BYTE, sizeof(buf));
1038         while (addition) {
1039                 int n = addition>sizeof(buf)?sizeof(buf):addition;
1040                 int ret = pwrite(tdb->fd, buf, n, size);
1041                 if (ret != n) {
1042                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1043                                    n, strerror(errno)));
1044                         return -1;
1045                 }
1046                 addition -= n;
1047                 size += n;
1048         }
1049         return 0;
1050 }
1051
1052
1053 /* expand the database at least size bytes by expanding the underlying
1054    file and doing the mmap again if necessary */
1055 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1056 {
1057         struct list_struct rec;
1058         tdb_off_t offset;
1059
1060         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1061                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1062                 return -1;
1063         }
1064
1065         /* must know about any previous expansions by another process */
1066         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1067
1068         /* always make room for at least 10 more records, and round
1069            the database up to a multiple of the page size */
1070         size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1071
1072         if (!(tdb->flags & TDB_INTERNAL))
1073                 tdb_munmap(tdb);
1074
1075         /*
1076          * We must ensure the file is unmapped before doing this
1077          * to ensure consistency with systems like OpenBSD where
1078          * writes and mmaps are not consistent.
1079          */
1080
1081         /* expand the file itself */
1082         if (!(tdb->flags & TDB_INTERNAL)) {
1083                 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1084                         goto fail;
1085         }
1086
1087         tdb->map_size += size;
1088
1089         if (tdb->flags & TDB_INTERNAL) {
1090                 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1091                                                     tdb->map_size);
1092                 if (!new_map_ptr) {
1093                         tdb->map_size -= size;
1094                         goto fail;
1095                 }
1096                 tdb->map_ptr = new_map_ptr;
1097         } else {
1098                 /*
1099                  * We must ensure the file is remapped before adding the space
1100                  * to ensure consistency with systems like OpenBSD where
1101                  * writes and mmaps are not consistent.
1102                  */
1103
1104                 /* We're ok if the mmap fails as we'll fallback to read/write */
1105                 tdb_mmap(tdb);
1106         }
1107
1108         /* form a new freelist record */
1109         memset(&rec,'\0',sizeof(rec));
1110         rec.rec_len = size - sizeof(rec);
1111
1112         /* link it into the free list */
1113         offset = tdb->map_size - size;
1114         if (tdb_free(tdb, offset, &rec) == -1)
1115                 goto fail;
1116
1117         tdb_unlock(tdb, -1, F_WRLCK);
1118         return 0;
1119  fail:
1120         tdb_unlock(tdb, -1, F_WRLCK);
1121         return -1;
1122 }
1123
1124 /* read/write a tdb_off_t */
1125 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1126 {
1127         return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1128 }
1129
1130 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1131 {
1132         tdb_off_t off = *d;
1133         return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1134 }
1135
1136
1137 /* read a lump of data, allocating the space for it */
1138 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1139 {
1140         unsigned char *buf;
1141
1142         /* some systems don't like zero length malloc */
1143         if (len == 0) {
1144                 len = 1;
1145         }
1146
1147         if (!(buf = (unsigned char *)malloc(len))) {
1148                 /* Ensure ecode is set for log fn. */
1149                 tdb->ecode = TDB_ERR_OOM;
1150                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1151                            len, strerror(errno)));
1152                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1153         }
1154         if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1155                 SAFE_FREE(buf);
1156                 return NULL;
1157         }
1158         return buf;
1159 }
1160
1161 /* Give a piece of tdb data to a parser */
1162
1163 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1164                    tdb_off_t offset, tdb_len_t len,
1165                    int (*parser)(TDB_DATA key, TDB_DATA data,
1166                                  void *private_data),
1167                    void *private_data)
1168 {
1169         TDB_DATA data;
1170         int result;
1171
1172         data.dsize = len;
1173
1174         if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1175                 /*
1176                  * Optimize by avoiding the malloc/memcpy/free, point the
1177                  * parser directly at the mmap area.
1178                  */
1179                 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1180                         return -1;
1181                 }
1182                 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1183                 return parser(key, data, private_data);
1184         }
1185
1186         if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1187                 return -1;
1188         }
1189
1190         result = parser(key, data, private_data);
1191         free(data.dptr);
1192         return result;
1193 }
1194
1195 /* read/write a record */
1196 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1197 {
1198         if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1199                 return -1;
1200         if (TDB_BAD_MAGIC(rec)) {
1201                 /* Ensure ecode is set for log fn. */
1202                 tdb->ecode = TDB_ERR_CORRUPT;
1203                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1204                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1205         }
1206         return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1207 }
1208
1209 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1210 {
1211         struct list_struct r = *rec;
1212         return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1213 }
1214
1215 static const struct tdb_methods io_methods = {
1216         tdb_read,
1217         tdb_write,
1218         tdb_next_hash_chain,
1219         tdb_oob,
1220         tdb_expand_file,
1221         tdb_brlock
1222 };
1223
1224 /*
1225   initialise the default methods table
1226 */
1227 void tdb_io_init(struct tdb_context *tdb)
1228 {
1229         tdb->methods = &io_methods;
1230 }
1231
1232 /* file: transaction.c */
1233
1234 /*
1235   transaction design:
1236
1237   - only allow a single transaction at a time per database. This makes
1238     using the transaction API simpler, as otherwise the caller would
1239     have to cope with temporary failures in transactions that conflict
1240     with other current transactions
1241
1242   - keep the transaction recovery information in the same file as the
1243     database, using a special 'transaction recovery' record pointed at
1244     by the header. This removes the need for extra journal files as
1245     used by some other databases
1246
1247   - dynamically allocated the transaction recover record, re-using it
1248     for subsequent transactions. If a larger record is needed then
1249     tdb_free() the old record to place it on the normal tdb freelist
1250     before allocating the new record
1251
1252   - during transactions, keep a linked list of writes all that have
1253     been performed by intercepting all tdb_write() calls. The hooked
1254     transaction versions of tdb_read() and tdb_write() check this
1255     linked list and try to use the elements of the list in preference
1256     to the real database.
1257
1258   - don't allow any locks to be held when a transaction starts,
1259     otherwise we can end up with deadlock (plus lack of lock nesting
1260     in posix locks would mean the lock is lost)
1261
1262   - if the caller gains a lock during the transaction but doesn't
1263     release it then fail the commit
1264
1265   - allow for nested calls to tdb_transaction_start(), re-using the
1266     existing transaction record. If the inner transaction is cancelled
1267     then a subsequent commit will fail
1268
1269   - keep a mirrored copy of the tdb hash chain heads to allow for the
1270     fast hash heads scan on traverse, updating the mirrored copy in
1271     the transaction version of tdb_write
1272
1273   - allow callers to mix transaction and non-transaction use of tdb,
1274     although once a transaction is started then an exclusive lock is
1275     gained until the transaction is committed or cancelled
1276
1277   - the commit stategy involves first saving away all modified data
1278     into a linearised buffer in the transaction recovery area, then
1279     marking the transaction recovery area with a magic value to
1280     indicate a valid recovery record. In total 4 fsync/msync calls are
1281     needed per commit to prevent race conditions. It might be possible
1282     to reduce this to 3 or even 2 with some more work.
1283
1284   - check for a valid recovery record on open of the tdb, while the
1285     global lock is held. Automatically recover from the transaction
1286     recovery area if needed, then continue with the open as
1287     usual. This allows for smooth crash recovery with no administrator
1288     intervention.
1289
1290   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1291     still available, but no transaction recovery area is used and no
1292     fsync/msync calls are made.
1293
1294 */
1295
1296 struct tdb_transaction_el {
1297         struct tdb_transaction_el *next, *prev;
1298         tdb_off_t offset;
1299         tdb_len_t length;
1300         unsigned char *data;
1301 };
1302
1303 /*
1304   hold the context of any current transaction
1305 */
1306 struct tdb_transaction {
1307         /* we keep a mirrored copy of the tdb hash heads here so
1308            tdb_next_hash_chain() can operate efficiently */
1309         u32 *hash_heads;
1310
1311         /* the original io methods - used to do IOs to the real db */
1312         const struct tdb_methods *io_methods;
1313
1314         /* the list of transaction elements. We use a doubly linked
1315            list with a last pointer to allow us to keep the list
1316            ordered, with first element at the front of the list. It
1317            needs to be doubly linked as the read/write traversals need
1318            to be backwards, while the commit needs to be forwards */
1319         struct tdb_transaction_el *elements, *elements_last;
1320
1321         /* non-zero when an internal transaction error has
1322            occurred. All write operations will then fail until the
1323            transaction is ended */
1324         int transaction_error;
1325
1326         /* when inside a transaction we need to keep track of any
1327            nested tdb_transaction_start() calls, as these are allowed,
1328            but don't create a new transaction */
1329         int nesting;
1330
1331         /* old file size before transaction */
1332         tdb_len_t old_map_size;
1333 };
1334
1335
1336 /*
1337   read while in a transaction. We need to check first if the data is in our list
1338   of transaction elements, then if not do a real read
1339 */
1340 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1341                             tdb_len_t len, int cv)
1342 {
1343         struct tdb_transaction_el *el;
1344
1345         /* we need to walk the list backwards to get the most recent data */
1346         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1347                 tdb_len_t partial;
1348
1349                 if (off+len <= el->offset) {
1350                         continue;
1351                 }
1352                 if (off >= el->offset + el->length) {
1353                         continue;
1354                 }
1355
1356                 /* an overlapping read - needs to be split into up to
1357                    2 reads and a memcpy */
1358                 if (off < el->offset) {
1359                         partial = el->offset - off;
1360                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1361                                 goto fail;
1362                         }
1363                         len -= partial;
1364                         off += partial;
1365                         buf = (void *)(partial + (char *)buf);
1366                 }
1367                 if (off + len <= el->offset + el->length) {
1368                         partial = len;
1369                 } else {
1370                         partial = el->offset + el->length - off;
1371                 }
1372                 memcpy(buf, el->data + (off - el->offset), partial);
1373                 if (cv) {
1374                         tdb_convert(buf, len);
1375                 }
1376                 len -= partial;
1377                 off += partial;
1378                 buf = (void *)(partial + (char *)buf);
1379
1380                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1381                         goto fail;
1382                 }
1383
1384                 return 0;
1385         }
1386
1387         /* its not in the transaction elements - do a real read */
1388         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1389
1390 fail:
1391         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1392         tdb->ecode = TDB_ERR_IO;
1393         tdb->transaction->transaction_error = 1;
1394         return -1;
1395 }
1396
1397
1398 /*
1399   write while in a transaction
1400 */
1401 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1402                              const void *buf, tdb_len_t len)
1403 {
1404         struct tdb_transaction_el *el, *best_el=NULL;
1405
1406         if (len == 0) {
1407                 return 0;
1408         }
1409
1410         /* if the write is to a hash head, then update the transaction
1411            hash heads */
1412         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1413             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1414                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1415                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1416         }
1417
1418         /* first see if we can replace an existing entry */
1419         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1420                 tdb_len_t partial;
1421
1422                 if (best_el == NULL && off == el->offset+el->length) {
1423                         best_el = el;
1424                 }
1425
1426                 if (off+len <= el->offset) {
1427                         continue;
1428                 }
1429                 if (off >= el->offset + el->length) {
1430                         continue;
1431                 }
1432
1433                 /* an overlapping write - needs to be split into up to
1434                    2 writes and a memcpy */
1435                 if (off < el->offset) {
1436                         partial = el->offset - off;
1437                         if (transaction_write(tdb, off, buf, partial) != 0) {
1438                                 goto fail;
1439                         }
1440                         len -= partial;
1441                         off += partial;
1442                         buf = (const void *)(partial + (const char *)buf);
1443                 }
1444                 if (off + len <= el->offset + el->length) {
1445                         partial = len;
1446                 } else {
1447                         partial = el->offset + el->length - off;
1448                 }
1449                 memcpy(el->data + (off - el->offset), buf, partial);
1450                 len -= partial;
1451                 off += partial;
1452                 buf = (const void *)(partial + (const char *)buf);
1453
1454                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1455                         goto fail;
1456                 }
1457
1458                 return 0;
1459         }
1460
1461         /* see if we can append the new entry to an existing entry */
1462         if (best_el && best_el->offset + best_el->length == off &&
1463             (off+len < tdb->transaction->old_map_size ||
1464              off > tdb->transaction->old_map_size)) {
1465                 unsigned char *data = best_el->data;
1466                 el = best_el;
1467                 el->data = (unsigned char *)realloc(el->data,
1468                                                     el->length + len);
1469                 if (el->data == NULL) {
1470                         tdb->ecode = TDB_ERR_OOM;
1471                         tdb->transaction->transaction_error = 1;
1472                         el->data = data;
1473                         return -1;
1474                 }
1475                 if (buf) {
1476                         memcpy(el->data + el->length, buf, len);
1477                 } else {
1478                         memset(el->data + el->length, TDB_PAD_BYTE, len);
1479                 }
1480                 el->length += len;
1481                 return 0;
1482         }
1483
1484         /* add a new entry at the end of the list */
1485         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1486         if (el == NULL) {
1487                 tdb->ecode = TDB_ERR_OOM;
1488                 tdb->transaction->transaction_error = 1;
1489                 return -1;
1490         }
1491         el->next = NULL;
1492         el->prev = tdb->transaction->elements_last;
1493         el->offset = off;
1494         el->length = len;
1495         el->data = (unsigned char *)malloc(len);
1496         if (el->data == NULL) {
1497                 free(el);
1498                 tdb->ecode = TDB_ERR_OOM;
1499                 tdb->transaction->transaction_error = 1;
1500                 return -1;
1501         }
1502         if (buf) {
1503                 memcpy(el->data, buf, len);
1504         } else {
1505                 memset(el->data, TDB_PAD_BYTE, len);
1506         }
1507         if (el->prev) {
1508                 el->prev->next = el;
1509         } else {
1510                 tdb->transaction->elements = el;
1511         }
1512         tdb->transaction->elements_last = el;
1513         return 0;
1514
1515 fail:
1516         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1517         tdb->ecode = TDB_ERR_IO;
1518         tdb->transaction->transaction_error = 1;
1519         return -1;
1520 }
1521
1522 /*
1523   accelerated hash chain head search, using the cached hash heads
1524 */
1525 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1526 {
1527         u32 h = *chain;
1528         for (;h < tdb->header.hash_size;h++) {
1529                 /* the +1 takes account of the freelist */
1530                 if (0 != tdb->transaction->hash_heads[h+1]) {
1531                         break;
1532                 }
1533         }
1534         (*chain) = h;
1535 }
1536
1537 /*
1538   out of bounds check during a transaction
1539 */
1540 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1541 {
1542         if (len <= tdb->map_size) {
1543                 return 0;
1544         }
1545         return TDB_ERRCODE(TDB_ERR_IO, -1);
1546 }
1547
1548 /*
1549   transaction version of tdb_expand().
1550 */
1551 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1552                                    tdb_off_t addition)
1553 {
1554         /* add a write to the transaction elements, so subsequent
1555            reads see the zero data */
1556         if (transaction_write(tdb, size, NULL, addition) != 0) {
1557                 return -1;
1558         }
1559
1560         return 0;
1561 }
1562
1563 /*
1564   brlock during a transaction - ignore them
1565 */
1566 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1567                               int rw_type, int lck_type, int probe, size_t len)
1568 {
1569         return 0;
1570 }
1571
1572 static const struct tdb_methods transaction_methods = {
1573         transaction_read,
1574         transaction_write,
1575         transaction_next_hash_chain,
1576         transaction_oob,
1577         transaction_expand_file,
1578         transaction_brlock
1579 };
1580
1581
1582 /*
1583   start a tdb transaction. No token is returned, as only a single
1584   transaction is allowed to be pending per tdb_context
1585 */
1586 int tdb_transaction_start(struct tdb_context *tdb)
1587 {
1588         /* some sanity checks */
1589         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1590                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1591                 tdb->ecode = TDB_ERR_EINVAL;
1592                 return -1;
1593         }
1594
1595         /* cope with nested tdb_transaction_start() calls */
1596         if (tdb->transaction != NULL) {
1597                 tdb->transaction->nesting++;
1598                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1599                          tdb->transaction->nesting));
1600                 return 0;
1601         }
1602
1603         if (tdb->num_locks != 0 || tdb->global_lock.count) {
1604                 /* the caller must not have any locks when starting a
1605                    transaction as otherwise we'll be screwed by lack
1606                    of nested locks in posix */
1607                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1608                 tdb->ecode = TDB_ERR_LOCK;
1609                 return -1;
1610         }
1611
1612         if (tdb->travlocks.next != NULL) {
1613                 /* you cannot use transactions inside a traverse (although you can use
1614                    traverse inside a transaction) as otherwise you can end up with
1615                    deadlock */
1616                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1617                 tdb->ecode = TDB_ERR_LOCK;
1618                 return -1;
1619         }
1620
1621         tdb->transaction = (struct tdb_transaction *)
1622                 calloc(sizeof(struct tdb_transaction), 1);
1623         if (tdb->transaction == NULL) {
1624                 tdb->ecode = TDB_ERR_OOM;
1625                 return -1;
1626         }
1627
1628         /* get the transaction write lock. This is a blocking lock. As
1629            discussed with Volker, there are a number of ways we could
1630            make this async, which we will probably do in the future */
1631         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1632                 SAFE_FREE(tdb->transaction);
1633                 return -1;
1634         }
1635
1636         /* get a read lock from the freelist to the end of file. This
1637            is upgraded to a write lock during the commit */
1638         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1639                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1640                 tdb->ecode = TDB_ERR_LOCK;
1641                 goto fail;
1642         }
1643
1644         /* setup a copy of the hash table heads so the hash scan in
1645            traverse can be fast */
1646         tdb->transaction->hash_heads = (u32 *)
1647                 calloc(tdb->header.hash_size+1, sizeof(u32));
1648         if (tdb->transaction->hash_heads == NULL) {
1649                 tdb->ecode = TDB_ERR_OOM;
1650                 goto fail;
1651         }
1652         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1653                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1654                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1655                 tdb->ecode = TDB_ERR_IO;
1656                 goto fail;
1657         }
1658
1659         /* make sure we know about any file expansions already done by
1660            anyone else */
1661         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1662         tdb->transaction->old_map_size = tdb->map_size;
1663
1664         /* finally hook the io methods, replacing them with
1665            transaction specific methods */
1666         tdb->transaction->io_methods = tdb->methods;
1667         tdb->methods = &transaction_methods;
1668
1669         /* by calling this transaction write here, we ensure that we don't grow the
1670            transaction linked list due to hash table updates */
1671         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1672                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
1673                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1674                 tdb->ecode = TDB_ERR_IO;
1675                 tdb->methods = tdb->transaction->io_methods;
1676                 goto fail;
1677         }
1678
1679         return 0;
1680
1681 fail:
1682         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1683         tdb_transaction_unlock(tdb);
1684         SAFE_FREE(tdb->transaction->hash_heads);
1685         SAFE_FREE(tdb->transaction);
1686         return -1;
1687 }
1688
1689
1690 /*
1691   cancel the current transaction
1692 */
1693 int tdb_transaction_cancel(struct tdb_context *tdb)
1694 {
1695         if (tdb->transaction == NULL) {
1696                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1697                 return -1;
1698         }
1699
1700         if (tdb->transaction->nesting != 0) {
1701                 tdb->transaction->transaction_error = 1;
1702                 tdb->transaction->nesting--;
1703                 return 0;
1704         }
1705
1706         tdb->map_size = tdb->transaction->old_map_size;
1707
1708         /* free all the transaction elements */
1709         while (tdb->transaction->elements) {
1710                 struct tdb_transaction_el *el = tdb->transaction->elements;
1711                 tdb->transaction->elements = el->next;
1712                 free(el->data);
1713                 free(el);
1714         }
1715
1716         /* remove any global lock created during the transaction */
1717         if (tdb->global_lock.count != 0) {
1718                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1719                 tdb->global_lock.count = 0;
1720         }
1721
1722         /* remove any locks created during the transaction */
1723         if (tdb->num_locks != 0) {
1724                 int i;
1725                 for (i=0;i<tdb->num_lockrecs;i++) {
1726                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1727                                    F_UNLCK,F_SETLKW, 0, 1);
1728                 }
1729                 tdb->num_locks = 0;
1730                 tdb->num_lockrecs = 0;
1731                 SAFE_FREE(tdb->lockrecs);
1732         }
1733
1734         /* restore the normal io methods */
1735         tdb->methods = tdb->transaction->io_methods;
1736
1737         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1738         tdb_transaction_unlock(tdb);
1739         SAFE_FREE(tdb->transaction->hash_heads);
1740         SAFE_FREE(tdb->transaction);
1741
1742         return 0;
1743 }
1744
1745 /*
1746   sync to disk
1747 */
1748 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1749 {
1750         if (fsync(tdb->fd) != 0) {
1751                 tdb->ecode = TDB_ERR_IO;
1752                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1753                 return -1;
1754         }
1755 #if defined(HAVE_MSYNC) && defined(MS_SYNC)
1756         if (tdb->map_ptr) {
1757                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1758                 if (msync(moffset + (char *)tdb->map_ptr,
1759                           length + (offset - moffset), MS_SYNC) != 0) {
1760                         tdb->ecode = TDB_ERR_IO;
1761                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1762                                  strerror(errno)));
1763                         return -1;
1764                 }
1765         }
1766 #endif
1767         return 0;
1768 }
1769
1770
1771 /*
1772   work out how much space the linearised recovery data will consume
1773 */
1774 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1775 {
1776         struct tdb_transaction_el *el;
1777         tdb_len_t recovery_size = 0;
1778
1779         recovery_size = sizeof(u32);
1780         for (el=tdb->transaction->elements;el;el=el->next) {
1781                 if (el->offset >= tdb->transaction->old_map_size) {
1782                         continue;
1783                 }
1784                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1785         }
1786
1787         return recovery_size;
1788 }
1789
1790 /*
1791   allocate the recovery area, or use an existing recovery area if it is
1792   large enough
1793 */
1794 static int tdb_recovery_allocate(struct tdb_context *tdb,
1795                                  tdb_len_t *recovery_size,
1796                                  tdb_off_t *recovery_offset,
1797                                  tdb_len_t *recovery_max_size)
1798 {
1799         struct list_struct rec;
1800         const struct tdb_methods *methods = tdb->transaction->io_methods;
1801         tdb_off_t recovery_head;
1802
1803         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1804                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1805                 return -1;
1806         }
1807
1808         rec.rec_len = 0;
1809
1810         if (recovery_head != 0 &&
1811             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1812                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1813                 return -1;
1814         }
1815
1816         *recovery_size = tdb_recovery_size(tdb);
1817
1818         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1819                 /* it fits in the existing area */
1820                 *recovery_max_size = rec.rec_len;
1821                 *recovery_offset = recovery_head;
1822                 return 0;
1823         }
1824
1825         /* we need to free up the old recovery area, then allocate a
1826            new one at the end of the file. Note that we cannot use
1827            tdb_allocate() to allocate the new one as that might return
1828            us an area that is being currently used (as of the start of
1829            the transaction) */
1830         if (recovery_head != 0) {
1831                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1832                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1833                         return -1;
1834                 }
1835         }
1836
1837         /* the tdb_free() call might have increased the recovery size */
1838         *recovery_size = tdb_recovery_size(tdb);
1839
1840         /* round up to a multiple of page size */
1841         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1842         *recovery_offset = tdb->map_size;
1843         recovery_head = *recovery_offset;
1844
1845         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1846                                      (tdb->map_size - tdb->transaction->old_map_size) +
1847                                      sizeof(rec) + *recovery_max_size) == -1) {
1848                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1849                 return -1;
1850         }
1851
1852         /* remap the file (if using mmap) */
1853         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1854
1855         /* we have to reset the old map size so that we don't try to expand the file
1856            again in the transaction commit, which would destroy the recovery area */
1857         tdb->transaction->old_map_size = tdb->map_size;
1858
1859         /* write the recovery header offset and sync - we can sync without a race here
1860            as the magic ptr in the recovery record has not been set */
1861         CONVERT(recovery_head);
1862         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1863                                &recovery_head, sizeof(tdb_off_t)) == -1) {
1864                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1865                 return -1;
1866         }
1867
1868         return 0;
1869 }
1870
1871
1872 /*
1873   setup the recovery data that will be used on a crash during commit
1874 */
1875 static int transaction_setup_recovery(struct tdb_context *tdb,
1876                                       tdb_off_t *magic_offset)
1877 {
1878         struct tdb_transaction_el *el;
1879         tdb_len_t recovery_size;
1880         unsigned char *data, *p;
1881         const struct tdb_methods *methods = tdb->transaction->io_methods;
1882         struct list_struct *rec;
1883         tdb_off_t recovery_offset, recovery_max_size;
1884         tdb_off_t old_map_size = tdb->transaction->old_map_size;
1885         u32 magic, tailer;
1886
1887         /*
1888           check that the recovery area has enough space
1889         */
1890         if (tdb_recovery_allocate(tdb, &recovery_size,
1891                                   &recovery_offset, &recovery_max_size) == -1) {
1892                 return -1;
1893         }
1894
1895         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1896         if (data == NULL) {
1897                 tdb->ecode = TDB_ERR_OOM;
1898                 return -1;
1899         }
1900
1901         rec = (struct list_struct *)data;
1902         memset(rec, 0, sizeof(*rec));
1903
1904         rec->magic    = 0;
1905         rec->data_len = recovery_size;
1906         rec->rec_len  = recovery_max_size;
1907         rec->key_len  = old_map_size;
1908         CONVERT(rec);
1909
1910         /* build the recovery data into a single blob to allow us to do a single
1911            large write, which should be more efficient */
1912         p = data + sizeof(*rec);
1913         for (el=tdb->transaction->elements;el;el=el->next) {
1914                 if (el->offset >= old_map_size) {
1915                         continue;
1916                 }
1917                 if (el->offset + el->length > tdb->transaction->old_map_size) {
1918                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1919                         free(data);
1920                         tdb->ecode = TDB_ERR_CORRUPT;
1921                         return -1;
1922                 }
1923                 memcpy(p, &el->offset, 4);
1924                 memcpy(p+4, &el->length, 4);
1925                 if (DOCONV()) {
1926                         tdb_convert(p, 8);
1927                 }
1928                 /* the recovery area contains the old data, not the
1929                    new data, so we have to call the original tdb_read
1930                    method to get it */
1931                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1932                         free(data);
1933                         tdb->ecode = TDB_ERR_IO;
1934                         return -1;
1935                 }
1936                 p += 8 + el->length;
1937         }
1938
1939         /* and the tailer */
1940         tailer = sizeof(*rec) + recovery_max_size;
1941         memcpy(p, &tailer, 4);
1942         CONVERT(p);
1943
1944         /* write the recovery data to the recovery area */
1945         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1946                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1947                 free(data);
1948                 tdb->ecode = TDB_ERR_IO;
1949                 return -1;
1950         }
1951
1952         /* as we don't have ordered writes, we have to sync the recovery
1953            data before we update the magic to indicate that the recovery
1954            data is present */
1955         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1956                 free(data);
1957                 return -1;
1958         }
1959
1960         free(data);
1961
1962         magic = TDB_RECOVERY_MAGIC;
1963         CONVERT(magic);
1964
1965         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1966
1967         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1968                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1969                 tdb->ecode = TDB_ERR_IO;
1970                 return -1;
1971         }
1972
1973         /* ensure the recovery magic marker is on disk */
1974         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1975                 return -1;
1976         }
1977
1978         return 0;
1979 }
1980
1981 /*
1982   commit the current transaction
1983 */
1984 int tdb_transaction_commit(struct tdb_context *tdb)
1985 {
1986         const struct tdb_methods *methods;
1987         tdb_off_t magic_offset = 0;
1988         u32 zero = 0;
1989
1990         if (tdb->transaction == NULL) {
1991                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1992                 return -1;
1993         }
1994
1995         if (tdb->transaction->transaction_error) {
1996                 tdb->ecode = TDB_ERR_IO;
1997                 tdb_transaction_cancel(tdb);
1998                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1999                 return -1;
2000         }
2001
2002         if (tdb->transaction->nesting != 0) {
2003                 tdb->transaction->nesting--;
2004                 return 0;
2005         }
2006
2007         /* check for a null transaction */
2008         if (tdb->transaction->elements == NULL) {
2009                 tdb_transaction_cancel(tdb);
2010                 return 0;
2011         }
2012
2013         methods = tdb->transaction->io_methods;
2014
2015         /* if there are any locks pending then the caller has not
2016            nested their locks properly, so fail the transaction */
2017         if (tdb->num_locks || tdb->global_lock.count) {
2018                 tdb->ecode = TDB_ERR_LOCK;
2019                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2020                 tdb_transaction_cancel(tdb);
2021                 return -1;
2022         }
2023
2024         /* upgrade the main transaction lock region to a write lock */
2025         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2026                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2027                 tdb->ecode = TDB_ERR_LOCK;
2028                 tdb_transaction_cancel(tdb);
2029                 return -1;
2030         }
2031
2032         /* get the global lock - this prevents new users attaching to the database
2033            during the commit */
2034         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2035                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2036                 tdb->ecode = TDB_ERR_LOCK;
2037                 tdb_transaction_cancel(tdb);
2038                 return -1;
2039         }
2040
2041         if (!(tdb->flags & TDB_NOSYNC)) {
2042                 /* write the recovery data to the end of the file */
2043                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2044                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2045                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2046                         tdb_transaction_cancel(tdb);
2047                         return -1;
2048                 }
2049         }
2050
2051         /* expand the file to the new size if needed */
2052         if (tdb->map_size != tdb->transaction->old_map_size) {
2053                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2054                                              tdb->map_size -
2055                                              tdb->transaction->old_map_size) == -1) {
2056                         tdb->ecode = TDB_ERR_IO;
2057                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2058                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2059                         tdb_transaction_cancel(tdb);
2060                         return -1;
2061                 }
2062                 tdb->map_size = tdb->transaction->old_map_size;
2063                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2064         }
2065
2066         /* perform all the writes */
2067         while (tdb->transaction->elements) {
2068                 struct tdb_transaction_el *el = tdb->transaction->elements;
2069
2070                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2071                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2072
2073                         /* we've overwritten part of the data and
2074                            possibly expanded the file, so we need to
2075                            run the crash recovery code */
2076                         tdb->methods = methods;
2077                         tdb_transaction_recover(tdb);
2078
2079                         tdb_transaction_cancel(tdb);
2080                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2081
2082                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2083                         return -1;
2084                 }
2085                 tdb->transaction->elements = el->next;
2086                 free(el->data);
2087                 free(el);
2088         }
2089
2090         if (!(tdb->flags & TDB_NOSYNC)) {
2091                 /* ensure the new data is on disk */
2092                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2093                         return -1;
2094                 }
2095
2096                 /* remove the recovery marker */
2097                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2098                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2099                         return -1;
2100                 }
2101
2102                 /* ensure the recovery marker has been removed on disk */
2103                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
2104                         return -1;
2105                 }
2106         }
2107
2108         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2109
2110         /*
2111           TODO: maybe write to some dummy hdr field, or write to magic
2112           offset without mmap, before the last sync, instead of the
2113           utime() call
2114         */
2115
2116         /* on some systems (like Linux 2.6.x) changes via mmap/msync
2117            don't change the mtime of the file, this means the file may
2118            not be backed up (as tdb rounding to block sizes means that
2119            file size changes are quite rare too). The following forces
2120            mtime changes when a transaction completes */
2121 #ifdef HAVE_UTIME
2122         utime(tdb->name, NULL);
2123 #endif
2124
2125         /* use a transaction cancel to free memory and remove the
2126            transaction locks */
2127         tdb_transaction_cancel(tdb);
2128         return 0;
2129 }
2130
2131
2132 /*
2133   recover from an aborted transaction. Must be called with exclusive
2134   database write access already established (including the global
2135   lock to prevent new processes attaching)
2136 */
2137 int tdb_transaction_recover(struct tdb_context *tdb)
2138 {
2139         tdb_off_t recovery_head, recovery_eof;
2140         unsigned char *data, *p;
2141         u32 zero = 0;
2142         struct list_struct rec;
2143
2144         /* find the recovery area */
2145         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2146                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2147                 tdb->ecode = TDB_ERR_IO;
2148                 return -1;
2149         }
2150
2151         if (recovery_head == 0) {
2152                 /* we have never allocated a recovery record */
2153                 return 0;
2154         }
2155
2156         /* read the recovery record */
2157         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2158                                    sizeof(rec), DOCONV()) == -1) {
2159                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2160                 tdb->ecode = TDB_ERR_IO;
2161                 return -1;
2162         }
2163
2164         if (rec.magic != TDB_RECOVERY_MAGIC) {
2165                 /* there is no valid recovery data */
2166                 return 0;
2167         }
2168
2169         if (tdb->read_only) {
2170                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2171                 tdb->ecode = TDB_ERR_CORRUPT;
2172                 return -1;
2173         }
2174
2175         recovery_eof = rec.key_len;
2176
2177         data = (unsigned char *)malloc(rec.data_len);
2178         if (data == NULL) {
2179                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2180                 tdb->ecode = TDB_ERR_OOM;
2181                 return -1;
2182         }
2183
2184         /* read the full recovery data */
2185         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2186                                    rec.data_len, 0) == -1) {
2187                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2188                 tdb->ecode = TDB_ERR_IO;
2189                 return -1;
2190         }
2191
2192         /* recover the file data */
2193         p = data;
2194         while (p+8 < data + rec.data_len) {
2195                 u32 ofs, len;
2196                 if (DOCONV()) {
2197                         tdb_convert(p, 8);
2198                 }
2199                 memcpy(&ofs, p, 4);
2200                 memcpy(&len, p+4, 4);
2201
2202                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2203                         free(data);
2204                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2205                         tdb->ecode = TDB_ERR_IO;
2206                         return -1;
2207                 }
2208                 p += 8 + len;
2209         }
2210
2211         free(data);
2212
2213         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2214                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2215                 tdb->ecode = TDB_ERR_IO;
2216                 return -1;
2217         }
2218
2219         /* if the recovery area is after the recovered eof then remove it */
2220         if (recovery_eof <= recovery_head) {
2221                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2222                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2223                         tdb->ecode = TDB_ERR_IO;
2224                         return -1;
2225                 }
2226         }
2227
2228         /* remove the recovery magic */
2229         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2230                           &zero) == -1) {
2231                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2232                 tdb->ecode = TDB_ERR_IO;
2233                 return -1;
2234         }
2235
2236         /* reduce the file size to the old size */
2237         tdb_munmap(tdb);
2238         if (ftruncate(tdb->fd, recovery_eof) != 0) {
2239                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2240                 tdb->ecode = TDB_ERR_IO;
2241                 return -1;
2242         }
2243         tdb->map_size = recovery_eof;
2244         tdb_mmap(tdb);
2245
2246         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2247                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2248                 tdb->ecode = TDB_ERR_IO;
2249                 return -1;
2250         }
2251
2252         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2253                  recovery_eof));
2254
2255         /* all done */
2256         return 0;
2257 }
2258
2259 /* file: freelist.c */
2260
2261 /* read a freelist record and check for simple errors */
2262 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2263 {
2264         if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2265                 return -1;
2266
2267         if (rec->magic == TDB_MAGIC) {
2268                 /* this happens when a app is showdown while deleting a record - we should
2269                    not completely fail when this happens */
2270                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2271                          rec->magic, off));
2272                 rec->magic = TDB_FREE_MAGIC;
2273                 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2274                         return -1;
2275         }
2276
2277         if (rec->magic != TDB_FREE_MAGIC) {
2278                 /* Ensure ecode is set for log fn. */
2279                 tdb->ecode = TDB_ERR_CORRUPT;
2280                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2281                            rec->magic, off));
2282                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2283         }
2284         if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2285                 return -1;
2286         return 0;
2287 }
2288
2289
2290
2291 /* Remove an element from the freelist.  Must have alloc lock. */
2292 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2293 {
2294         tdb_off_t last_ptr, i;
2295
2296         /* read in the freelist top */
2297         last_ptr = FREELIST_TOP;
2298         while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2299                 if (i == off) {
2300                         /* We've found it! */
2301                         return tdb_ofs_write(tdb, last_ptr, &next);
2302                 }
2303                 /* Follow chain (next offset is at start of record) */
2304                 last_ptr = i;
2305         }
2306         TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2307         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2308 }
2309
2310
2311 /* update a record tailer (must hold allocation lock) */
2312 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2313                          const struct list_struct *rec)
2314 {
2315         tdb_off_t totalsize;
2316
2317         /* Offset of tailer from record header */
2318         totalsize = sizeof(*rec) + rec->rec_len;
2319         return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2320                          &totalsize);
2321 }
2322
2323 /* Add an element into the freelist. Merge adjacent records if
2324    neccessary. */
2325 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2326 {
2327         tdb_off_t right, left;
2328
2329         /* Allocation and tailer lock */
2330         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2331                 return -1;
2332
2333         /* set an initial tailer, so if we fail we don't leave a bogus record */
2334         if (update_tailer(tdb, offset, rec) != 0) {
2335                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2336                 goto fail;
2337         }
2338
2339         /* Look right first (I'm an Australian, dammit) */
2340         right = offset + sizeof(*rec) + rec->rec_len;
2341         if (right + sizeof(*rec) <= tdb->map_size) {
2342                 struct list_struct r;
2343
2344                 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2345                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2346                         goto left;
2347                 }
2348
2349                 /* If it's free, expand to include it. */
2350                 if (r.magic == TDB_FREE_MAGIC) {
2351                         if (remove_from_freelist(tdb, right, r.next) == -1) {
2352                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2353                                 goto left;
2354                         }
2355                         rec->rec_len += sizeof(r) + r.rec_len;
2356                 }
2357         }
2358
2359 left:
2360         /* Look left */
2361         left = offset - sizeof(tdb_off_t);
2362         if (left > TDB_DATA_START(tdb->header.hash_size)) {
2363                 struct list_struct l;
2364                 tdb_off_t leftsize;
2365
2366                 /* Read in tailer and jump back to header */
2367                 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2368                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2369                         goto update;
2370                 }
2371
2372                 /* it could be uninitialised data */
2373                 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2374                         goto update;
2375                 }
2376
2377                 left = offset - leftsize;
2378
2379                 /* Now read in record */
2380                 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2381                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2382                         goto update;
2383                 }
2384
2385                 /* If it's free, expand to include it. */
2386                 if (l.magic == TDB_FREE_MAGIC) {
2387                         if (remove_from_freelist(tdb, left, l.next) == -1) {
2388                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2389                                 goto update;
2390                         } else {
2391                                 offset = left;
2392                                 rec->rec_len += leftsize;
2393                         }
2394                 }
2395         }
2396
2397 update:
2398         if (update_tailer(tdb, offset, rec) == -1) {
2399                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2400                 goto fail;
2401         }
2402
2403         /* Now, prepend to free list */
2404         rec->magic = TDB_FREE_MAGIC;
2405
2406         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2407             tdb_rec_write(tdb, offset, rec) == -1 ||
2408             tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2409                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2410                 goto fail;
2411         }
2412
2413         /* And we're done. */
2414         tdb_unlock(tdb, -1, F_WRLCK);
2415         return 0;
2416
2417  fail:
2418         tdb_unlock(tdb, -1, F_WRLCK);
2419         return -1;
2420 }
2421
2422
2423 /*
2424    the core of tdb_allocate - called when we have decided which
2425    free list entry to use
2426  */
2427 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2428                                 struct list_struct *rec, tdb_off_t last_ptr)
2429 {
2430         struct list_struct newrec;
2431         tdb_off_t newrec_ptr;
2432
2433         memset(&newrec, '\0', sizeof(newrec));
2434
2435         /* found it - now possibly split it up  */
2436         if (rec->rec_len > length + MIN_REC_SIZE) {
2437                 /* Length of left piece */
2438                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2439
2440                 /* Right piece to go on free list */
2441                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2442                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2443
2444                 /* And left record is shortened */
2445                 rec->rec_len = length;
2446         } else {
2447                 newrec_ptr = 0;
2448         }
2449
2450         /* Remove allocated record from the free list */
2451         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2452                 return 0;
2453         }
2454
2455         /* Update header: do this before we drop alloc
2456            lock, otherwise tdb_free() might try to
2457            merge with us, thinking we're free.
2458            (Thanks Jeremy Allison). */
2459         rec->magic = TDB_MAGIC;
2460         if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2461                 return 0;
2462         }
2463
2464         /* Did we create new block? */
2465         if (newrec_ptr) {
2466                 /* Update allocated record tailer (we
2467                    shortened it). */
2468                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2469                         return 0;
2470                 }
2471
2472                 /* Free new record */
2473                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2474                         return 0;
2475                 }
2476         }
2477
2478         /* all done - return the new record offset */
2479         return rec_ptr;
2480 }
2481
2482 /* allocate some space from the free list. The offset returned points
2483    to a unconnected list_struct within the database with room for at
2484    least length bytes of total data
2485
2486    0 is returned if the space could not be allocated
2487  */
2488 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2489 {
2490         tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2491         struct {
2492                 tdb_off_t rec_ptr, last_ptr;
2493                 tdb_len_t rec_len;
2494         } bestfit;
2495
2496         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2497                 return 0;
2498
2499         /* Extra bytes required for tailer */
2500         length += sizeof(tdb_off_t);
2501
2502  again:
2503         last_ptr = FREELIST_TOP;
2504
2505         /* read in the freelist top */
2506         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2507                 goto fail;
2508
2509         bestfit.rec_ptr = 0;
2510         bestfit.last_ptr = 0;
2511         bestfit.rec_len = 0;
2512
2513         /*
2514            this is a best fit allocation strategy. Originally we used
2515            a first fit strategy, but it suffered from massive fragmentation
2516            issues when faced with a slowly increasing record size.
2517          */
2518         while (rec_ptr) {
2519                 if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2520                         goto fail;
2521                 }
2522
2523                 if (rec->rec_len >= length) {
2524                         if (bestfit.rec_ptr == 0 ||
2525                             rec->rec_len < bestfit.rec_len) {
2526                                 bestfit.rec_len = rec->rec_len;
2527                                 bestfit.rec_ptr = rec_ptr;
2528                                 bestfit.last_ptr = last_ptr;
2529                                 /* consider a fit to be good enough if
2530                                    we aren't wasting more than half
2531                                    the space */
2532                                 if (bestfit.rec_len < 2*length) {
2533                                         break;
2534                                 }
2535                         }
2536                 }
2537
2538                 /* move to the next record */
2539                 last_ptr = rec_ptr;
2540                 rec_ptr = rec->next;
2541         }
2542
2543         if (bestfit.rec_ptr != 0) {
2544                 if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2545                         goto fail;
2546                 }
2547
2548                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2549                 tdb_unlock(tdb, -1, F_WRLCK);
2550                 return newrec_ptr;
2551         }
2552
2553         /* we didn't find enough space. See if we can expand the
2554            database and if we can then try again */
2555         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2556                 goto again;
2557  fail:
2558         tdb_unlock(tdb, -1, F_WRLCK);
2559         return 0;
2560 }
2561
2562 /* file: freelistcheck.c */
2563
2564 /* Check the freelist is good and contains no loops.
2565    Very memory intensive - only do this as a consistency
2566    checker. Heh heh - uses an in memory tdb as the storage
2567    for the "seen" record list. For some reason this strikes
2568    me as extremely clever as I don't have to write another tree
2569    data structure implementation :-).
2570  */
2571
2572 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2573 {
2574         TDB_DATA key, data;
2575
2576         memset(&data, '\0', sizeof(data));
2577         key.dptr = (unsigned char *)&rec_ptr;
2578         key.dsize = sizeof(rec_ptr);
2579         return tdb_store(mem_tdb, key, data, TDB_INSERT);
2580 }
2581
2582 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2583 {
2584         struct tdb_context *mem_tdb = NULL;
2585         struct list_struct rec;
2586         tdb_off_t rec_ptr, last_ptr;
2587         int ret = -1;
2588
2589         *pnum_entries = 0;
2590
2591         mem_tdb = tdb_open("flval", tdb->header.hash_size,
2592                                 TDB_INTERNAL, O_RDWR, 0600);
2593         if (!mem_tdb) {
2594                 return -1;
2595         }
2596
2597         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2598                 tdb_close(mem_tdb);
2599                 return 0;
2600         }
2601
2602         last_ptr = FREELIST_TOP;
2603
2604         /* Store the FREELIST_TOP record. */
2605         if (seen_insert(mem_tdb, last_ptr) == -1) {
2606                 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2607                 goto fail;
2608         }
2609
2610         /* read in the freelist top */
2611         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2612                 goto fail;
2613         }
2614
2615         while (rec_ptr) {
2616
2617                 /* If we can't store this record (we've seen it
2618                    before) then the free list has a loop and must
2619                    be corrupt. */
2620
2621                 if (seen_insert(mem_tdb, rec_ptr)) {
2622                         ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2623                         goto fail;
2624                 }
2625
2626                 if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2627                         goto fail;
2628                 }
2629
2630                 /* move to the next record */
2631                 last_ptr = rec_ptr;
2632                 rec_ptr = rec.next;
2633                 *pnum_entries += 1;
2634         }
2635
2636         ret = 0;
2637
2638   fail:
2639
2640         tdb_close(mem_tdb);
2641         tdb_unlock(tdb, -1, F_WRLCK);
2642         return ret;
2643 }
2644
2645 /* file: traverse.c */
2646
2647 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2648 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2649                          struct list_struct *rec)
2650 {
2651         int want_next = (tlock->off != 0);
2652
2653         /* Lock each chain from the start one. */
2654         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2655                 if (!tlock->off && tlock->hash != 0) {
2656                         /* this is an optimisation for the common case where
2657                            the hash chain is empty, which is particularly
2658                            common for the use of tdb with ldb, where large
2659                            hashes are used. In that case we spend most of our
2660                            time in tdb_brlock(), locking empty hash chains.
2661
2662                            To avoid this, we do an unlocked pre-check to see
2663                            if the hash chain is empty before starting to look
2664                            inside it. If it is empty then we can avoid that
2665                            hash chain. If it isn't empty then we can't believe
2666                            the value we get back, as we read it without a
2667                            lock, so instead we get the lock and re-fetch the
2668                            value below.
2669
2670                            Notice that not doing this optimisation on the
2671                            first hash chain is critical. We must guarantee
2672                            that we have done at least one fcntl lock at the
2673                            start of a search to guarantee that memory is
2674                            coherent on SMP systems. If records are added by
2675                            others during the search then thats OK, and we
2676                            could possibly miss those with this trick, but we
2677                            could miss them anyway without this trick, so the
2678                            semantics don't change.
2679
2680                            With a non-indexed ldb search this trick gains us a
2681                            factor of around 80 in speed on a linux 2.6.x
2682                            system (testing using ldbtest).
2683                         */
2684                         tdb->methods->next_hash_chain(tdb, &tlock->hash);
2685                         if (tlock->hash == tdb->header.hash_size) {
2686                                 continue;
2687                         }
2688                 }
2689
2690                 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2691                         return -1;
2692
2693                 /* No previous record?  Start at top of chain. */
2694                 if (!tlock->off) {
2695                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2696                                      &tlock->off) == -1)
2697                                 goto fail;
2698                 } else {
2699                         /* Otherwise unlock the previous record. */
2700                         if (tdb_unlock_record(tdb, tlock->off) != 0)
2701                                 goto fail;
2702                 }
2703
2704                 if (want_next) {
2705                         /* We have offset of old record: grab next */
2706                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2707                                 goto fail;
2708                         tlock->off = rec->next;
2709                 }
2710
2711                 /* Iterate through chain */
2712                 while( tlock->off) {
2713                         tdb_off_t current;
2714                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2715                                 goto fail;
2716
2717                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2718                         if (tlock->off == rec->next) {
2719                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2720                                 goto fail;
2721                         }
2722
2723                         if (!TDB_DEAD(rec)) {
2724                                 /* Woohoo: we found one! */
2725                                 if (tdb_lock_record(tdb, tlock->off) != 0)
2726                                         goto fail;
2727                                 return tlock->off;
2728                         }
2729
2730                         /* Try to clean dead ones from old traverses */
2731                         current = tlock->off;
2732                         tlock->off = rec->next;
2733                         if (!(tdb->read_only || tdb->traverse_read) &&
2734                             tdb_do_delete(tdb, current, rec) != 0)
2735                                 goto fail;
2736                 }
2737                 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2738                 want_next = 0;
2739         }
2740         /* We finished iteration without finding anything */
2741         return TDB_ERRCODE(TDB_SUCCESS, 0);
2742
2743  fail:
2744         tlock->off = 0;
2745         if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2746                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2747         return -1;
2748 }
2749
2750 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2751    return -1 on error or the record count traversed
2752    if fn is NULL then it is not called
2753    a non-zero return value from fn() indicates that the traversal should stop
2754   */
2755 static int tdb_traverse_internal(struct tdb_context *tdb,
2756                                  tdb_traverse_func fn, void *private_data,
2757                                  struct tdb_traverse_lock *tl)
2758 {
2759         TDB_DATA key, dbuf;
2760         struct list_struct rec;
2761         int ret, count = 0;
2762
2763         /* This was in the initializaton, above, but the IRIX compiler
2764          * did not like it.  crh
2765          */
2766         tl->next = tdb->travlocks.next;
2767
2768         /* fcntl locks don't stack: beware traverse inside traverse */
2769         tdb->travlocks.next = tl;
2770
2771         /* tdb_next_lock places locks on the record returned, and its chain */
2772         while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2773                 count++;
2774                 /* now read the full record */
2775                 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2776                                           rec.key_len + rec.data_len);
2777                 if (!key.dptr) {
2778                         ret = -1;
2779                         if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2780                                 goto out;
2781                         if (tdb_unlock_record(tdb, tl->off) != 0)
2782                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2783                         goto out;
2784                 }
2785                 key.dsize = rec.key_len;
2786                 dbuf.dptr = key.dptr + rec.key_len;
2787                 dbuf.dsize = rec.data_len;
2788
2789                 /* Drop chain lock, call out */
2790                 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2791                         ret = -1;
2792                         SAFE_FREE(key.dptr);
2793                         goto out;
2794                 }
2795                 if (fn && fn(tdb, key, dbuf, private_data)) {
2796                         /* They want us to terminate traversal */
2797                         ret = count;
2798                         if (tdb_unlock_record(tdb, tl->off) != 0) {
2799                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2800                                 ret = -1;
2801                         }
2802                         SAFE_FREE(key.dptr);
2803                         goto out;
2804                 }
2805                 SAFE_FREE(key.dptr);
2806         }
2807 out:
2808         tdb->travlocks.next = tl->next;
2809         if (ret < 0)
2810                 return -1;
2811         else
2812                 return count;
2813 }
2814
2815
2816 /*
2817   a write style traverse - temporarily marks the db read only
2818 */
2819 int tdb_traverse_read(struct tdb_context *tdb,
2820                       tdb_traverse_func fn, void *private_data)
2821 {
2822         struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2823         int ret;
2824
2825         /* we need to get a read lock on the transaction lock here to
2826            cope with the lock ordering semantics of solaris10 */
2827         if (tdb_transaction_lock(tdb, F_RDLCK)) {
2828                 return -1;
2829         }
2830
2831         tdb->traverse_read++;
2832         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2833         tdb->traverse_read--;
2834
2835         tdb_transaction_unlock(tdb);
2836
2837         return ret;
2838 }
2839
2840 /*
2841   a write style traverse - needs to get the transaction lock to
2842   prevent deadlocks
2843 */
2844 int tdb_traverse(struct tdb_context *tdb,
2845                  tdb_traverse_func fn, void *private_data)
2846 {
2847         struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2848         int ret;
2849
2850         if (tdb->read_only || tdb->traverse_read) {
2851                 return tdb_traverse_read(tdb, fn, private_data);
2852         }
2853
2854         if (tdb_transaction_lock(tdb, F_WRLCK)) {
2855                 return -1;
2856         }
2857
2858         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2859
2860         tdb_transaction_unlock(tdb);
2861
2862         return ret;
2863 }
2864
2865
2866 /* find the first entry in the database and return its key */
2867 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2868 {
2869         TDB_DATA key;
2870         struct list_struct rec;
2871
2872         /* release any old lock */
2873         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2874                 return tdb_null;
2875         tdb->travlocks.off = tdb->travlocks.hash = 0;
2876         tdb->travlocks.lock_rw = F_RDLCK;
2877
2878         /* Grab first record: locks chain and returned record. */
2879         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2880                 return tdb_null;
2881         /* now read the key */
2882         key.dsize = rec.key_len;
2883         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2884
2885         /* Unlock the hash chain of the record we just read. */
2886         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2887                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2888         return key;
2889 }
2890
2891 /* find the next entry in the database, returning its key */
2892 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2893 {
2894         u32 oldhash;
2895         TDB_DATA key = tdb_null;
2896         struct list_struct rec;
2897         unsigned char *k = NULL;
2898
2899         /* Is locked key the old key?  If so, traverse will be reliable. */
2900         if (tdb->travlocks.off) {
2901                 if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2902                         return tdb_null;
2903                 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2904                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2905                                             rec.key_len))
2906                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2907                         /* No, it wasn't: unlock it and start from scratch */
2908                         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2909                                 SAFE_FREE(k);
2910                                 return tdb_null;
2911                         }
2912                         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2913                                 SAFE_FREE(k);
2914                                 return tdb_null;
2915                         }
2916                         tdb->travlocks.off = 0;
2917                 }
2918
2919                 SAFE_FREE(k);
2920         }
2921
2922         if (!tdb->travlocks.off) {
2923                 /* No previous element: do normal find, and lock record */
2924                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2925                 if (!tdb->travlocks.off)
2926                         return tdb_null;
2927                 tdb->travlocks.hash = BUCKET(rec.full_hash);
2928                 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2929                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2930                         return tdb_null;
2931                 }
2932         }
2933         oldhash = tdb->travlocks.hash;
2934
2935         /* Grab next record: locks chain and returned record,
2936            unlocks old record */
2937         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2938                 key.dsize = rec.key_len;
2939                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2940                                           key.dsize);
2941                 /* Unlock the chain of this new record */
2942                 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2943                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2944         }
2945         /* Unlock the chain of old record */
2946         if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2947                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2948         return key;
2949 }
2950
2951 /* file: dump.c */
2952
2953 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2954                                  tdb_off_t offset)
2955 {
2956         struct list_struct rec;
2957         tdb_off_t tailer_ofs, tailer;
2958
2959         if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2960                                    sizeof(rec), DOCONV()) == -1) {
2961                 printf("ERROR: failed to read record at %u\n", offset);
2962                 return 0;
2963         }
2964
2965         printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2966                "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2967                hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2968                rec.full_hash, rec.magic);
2969
2970         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2971
2972         if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2973                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2974                 return rec.next;
2975         }
2976
2977         if (tailer != rec.rec_len + sizeof(rec)) {
2978                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2979                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2980         }
2981         return rec.next;
2982 }
2983
2984 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2985 {
2986         tdb_off_t rec_ptr, top;
2987
2988         top = TDB_HASH_TOP(i);
2989
2990         if (tdb_lock(tdb, i, F_WRLCK) != 0)
2991                 return -1;
2992
2993         if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2994                 return tdb_unlock(tdb, i, F_WRLCK);
2995
2996         if (rec_ptr)
2997                 printf("hash=%d\n", i);
2998
2999         while (rec_ptr) {
3000                 rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
3001         }
3002
3003         return tdb_unlock(tdb, i, F_WRLCK);
3004 }
3005
3006 void tdb_dump_all(struct tdb_context *tdb)
3007 {
3008         int i;
3009         for (i=0;i<tdb->header.hash_size;i++) {
3010                 tdb_dump_chain(tdb, i);
3011         }
3012         printf("freelist:\n");
3013         tdb_dump_chain(tdb, -1);
3014 }
3015
3016 int tdb_printfreelist(struct tdb_context *tdb)
3017 {
3018         int ret;
3019         long total_free = 0;
3020         tdb_off_t offset, rec_ptr;
3021         struct list_struct rec;
3022
3023         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3024                 return ret;
3025
3026         offset = FREELIST_TOP;
3027
3028         /* read in the freelist top */
3029         if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3030                 tdb_unlock(tdb, -1, F_WRLCK);
3031                 return 0;
3032         }
3033
3034         printf("freelist top=[0x%08x]\n", rec_ptr );
3035         while (rec_ptr) {
3036                 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3037                                            sizeof(rec), DOCONV()) == -1) {
3038                         tdb_unlock(tdb, -1, F_WRLCK);
3039                         return -1;
3040                 }
3041
3042                 if (rec.magic != TDB_FREE_MAGIC) {
3043                         printf("bad magic 0x%08x in free list\n", rec.magic);
3044                         tdb_unlock(tdb, -1, F_WRLCK);
3045                         return -1;
3046                 }
3047
3048                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3049                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3050                 total_free += rec.rec_len;
3051
3052                 /* move to the next record */
3053                 rec_ptr = rec.next;
3054         }
3055         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3056                (int)total_free);
3057
3058         return tdb_unlock(tdb, -1, F_WRLCK);
3059 }
3060
3061 /* file: tdb.c */
3062
3063 /*
3064   non-blocking increment of the tdb sequence number if the tdb has been opened using
3065   the TDB_SEQNUM flag
3066 */
3067 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3068 {
3069         tdb_off_t seqnum=0;
3070
3071         if (!(tdb->flags & TDB_SEQNUM)) {
3072                 return;
3073         }
3074
3075         /* we ignore errors from this, as we have no sane way of
3076            dealing with them.
3077         */
3078         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3079         seqnum++;
3080         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3081 }
3082
3083 /*
3084   increment the tdb sequence number if the tdb has been opened using
3085   the TDB_SEQNUM flag
3086 */
3087 static void tdb_increment_seqnum(struct tdb_context *tdb)
3088 {
3089         if (!(tdb->flags & TDB_SEQNUM)) {
3090                 return;
3091         }
3092
3093         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3094                 return;
3095         }
3096
3097         tdb_increment_seqnum_nonblock(tdb);
3098
3099         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3100 }
3101
3102 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3103 {
3104         return memcmp(data.dptr, key.dptr, data.dsize);
3105 }
3106
3107 /* Returns 0 on fail.  On success, return offset of record, and fills
3108    in rec */
3109 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3110                         struct list_struct *r)
3111 {
3112         tdb_off_t rec_ptr;
3113
3114         /* read in the hash top */
3115         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3116                 return 0;
3117
3118         /* keep looking until we find the right record */
3119         while (rec_ptr) {
3120                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3121                         return 0;
3122
3123                 if (!TDB_DEAD(r) && hash==r->full_hash
3124                     && key.dsize==r->key_len
3125                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3126                                       r->key_len, tdb_key_compare,
3127                                       NULL) == 0) {
3128                         return rec_ptr;
3129                 }
3130                 rec_ptr = r->next;
3131         }
3132         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3133 }
3134
3135 /* As tdb_find, but if you succeed, keep the lock */
3136 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3137                            struct list_struct *rec)
3138 {
3139         u32 rec_ptr;
3140
3141         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3142                 return 0;
3143         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3144                 tdb_unlock(tdb, BUCKET(hash), locktype);
3145         return rec_ptr;
3146 }
3147
3148
3149 /* update an entry in place - this only works if the new data size
3150    is <= the old data size and the key exists.
3151    on failure return -1.
3152 */
3153 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3154 {
3155         struct list_struct rec;
3156         tdb_off_t rec_ptr;
3157
3158         /* find entry */
3159         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3160                 return -1;
3161
3162         /* must be long enough key, data and tailer */
3163         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3164                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3165                 return -1;
3166         }
3167
3168         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3169                       dbuf.dptr, dbuf.dsize) == -1)
3170                 return -1;
3171
3172         if (dbuf.dsize != rec.data_len) {
3173                 /* update size */
3174                 rec.data_len = dbuf.dsize;
3175                 return tdb_rec_write(tdb, rec_ptr, &rec);
3176         }
3177
3178         return 0;
3179 }
3180
3181 /* find an entry in the database given a key */
3182 /* If an entry doesn't exist tdb_err will be set to
3183  * TDB_ERR_NOEXIST. If a key has no data attached
3184  * then the TDB_DATA will have zero length but
3185  * a non-zero pointer
3186  */
3187 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3188 {
3189         tdb_off_t rec_ptr;
3190         struct list_struct rec;
3191         TDB_DATA ret;
3192         u32 hash;
3193
3194         /* find which hash bucket it is in */
3195         hash = tdb->hash_fn(&key);
3196         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3197                 return tdb_null;
3198
3199         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3200                                   rec.data_len);
3201         ret.dsize = rec.data_len;
3202         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3203         return ret;
3204 }
3205
3206 /*
3207  * Find an entry in the database and hand the record's data to a parsing
3208  * function. The parsing function is executed under the chain read lock, so it
3209  * should be fast and should not block on other syscalls.
3210  *
3211  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3212  *
3213  * For mmapped tdb's that do not have a transaction open it points the parsing
3214  * function directly at the mmap area, it avoids the malloc/memcpy in this
3215  * case. If a transaction is open or no mmap is available, it has to do
3216  * malloc/read/parse/free.
3217  *
3218  * This is interesting for all readers of potentially large data structures in
3219  * the tdb records, ldb indexes being one example.
3220  */
3221
3222 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3223                      int (*parser)(TDB_DATA key, TDB_DATA data,
3224                                    void *private_data),
3225                      void *private_data)
3226 {
3227         tdb_off_t rec_ptr;
3228         struct list_struct rec;
3229         int ret;
3230         u32 hash;
3231
3232         /* find which hash bucket it is in */
3233         hash = tdb->hash_fn(&key);
3234
3235         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3236                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3237         }
3238
3239         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3240                              rec.data_len, parser, private_data);
3241
3242         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3243
3244         return ret;
3245 }
3246
3247 /* check if an entry in the database exists
3248
3249    note that 1 is returned if the key is found and 0 is returned if not found
3250    this doesn't match the conventions in the rest of this module, but is
3251    compatible with gdbm
3252 */
3253 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3254 {
3255         struct list_struct rec;
3256
3257         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3258                 return 0;
3259         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3260         return 1;
3261 }
3262
3263 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3264 {
3265         u32 hash = tdb->hash_fn(&key);
3266         return tdb_exists_hash(tdb, key, hash);
3267 }
3268
3269 /* actually delete an entry in the database given the offset */
3270 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3271 {
3272         tdb_off_t last_ptr, i;
3273         struct list_struct lastrec;
3274
3275         if (tdb->read_only || tdb->traverse_read) return -1;
3276
3277         if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3278                 /* Someone traversing here: mark it as dead */
3279                 rec->magic = TDB_DEAD_MAGIC;
3280                 return tdb_rec_write(tdb, rec_ptr, rec);
3281         }
3282         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3283                 return -1;
3284
3285         /* find previous record in hash chain */
3286         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3287                 return -1;
3288         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3289                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3290                         return -1;
3291
3292         /* unlink it: next ptr is at start of record. */
3293         if (last_ptr == 0)
3294                 last_ptr = TDB_HASH_TOP(rec->full_hash);
3295         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3296                 return -1;
3297
3298         /* recover the space */
3299         if (tdb_free(tdb, rec_ptr, rec) == -1)
3300                 return -1;
3301         return 0;
3302 }
3303
3304 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3305 {
3306         int res = 0;
3307         tdb_off_t rec_ptr;
3308         struct list_struct rec;
3309
3310         /* read in the hash top */
3311         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3312                 return 0;
3313
3314         while (rec_ptr) {
3315                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3316                         return 0;
3317
3318                 if (rec.magic == TDB_DEAD_MAGIC) {
3319                         res += 1;
3320                 }
3321                 rec_ptr = rec.next;
3322         }
3323         return res;
3324 }
3325
3326 /*
3327  * Purge all DEAD records from a hash chain
3328  */
3329 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3330 {
3331         int res = -1;
3332         struct list_struct rec;
3333         tdb_off_t rec_ptr;
3334
3335         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3336                 return -1;
3337         }
3338
3339         /* read in the hash top */
3340         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3341                 goto fail;
3342
3343         while (rec_ptr) {
3344                 tdb_off_t next;
3345
3346                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3347                         goto fail;
3348                 }
3349
3350                 next = rec.next;
3351
3352                 if (rec.magic == TDB_DEAD_MAGIC
3353                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3354                         goto fail;
3355                 }
3356                 rec_ptr = next;
3357         }
3358         res = 0;
3359  fail:
3360         tdb_unlock(tdb, -1, F_WRLCK);
3361         return res;
3362 }
3363
3364 /* delete an entry in the database given a key */
3365 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3366 {
3367         tdb_off_t rec_ptr;
3368         struct list_struct rec;
3369         int ret;
3370
3371         if (tdb->max_dead_records != 0) {
3372
3373                 /*
3374                  * Allow for some dead records per hash chain, mainly for
3375                  * tdb's with a very high create/delete rate like locking.tdb.
3376                  */
3377
3378                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3379                         return -1;
3380
3381                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3382                         /*
3383                          * Don't let the per-chain freelist grow too large,
3384                          * delete all existing dead records
3385                          */
3386                         tdb_purge_dead(tdb, hash);
3387                 }
3388
3389                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3390                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3391                         return -1;
3392                 }
3393
3394                 /*
3395                  * Just mark the record as dead.
3396                  */
3397                 rec.magic = TDB_DEAD_MAGIC;
3398                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3399         }
3400         else {
3401                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3402                                                    &rec)))
3403                         return -1;
3404
3405                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3406         }
3407
3408         if (ret == 0) {
3409                 tdb_increment_seqnum(tdb);
3410         }
3411
3412         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3413                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3414         return ret;
3415 }
3416
3417 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3418 {
3419         u32 hash = tdb->hash_fn(&key);
3420         return tdb_delete_hash(tdb, key, hash);
3421 }
3422
3423 /*
3424  * See if we have a dead record around with enough space
3425  */
3426 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3427                                struct list_struct *r, tdb_len_t length)
3428 {
3429         tdb_off_t rec_ptr;
3430
3431         /* read in the hash top */
3432         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3433                 return 0;
3434
3435         /* keep looking until we find the right record */
3436         while (rec_ptr) {
3437                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3438                         return 0;
3439
3440                 if (TDB_DEAD(r) && r->rec_len >= length) {
3441                         /*
3442                          * First fit for simple coding, TODO: change to best
3443                          * fit
3444                          */
3445                         return rec_ptr;
3446                 }
3447                 rec_ptr = r->next;
3448         }
3449         return 0;
3450 }
3451
3452 /* store an element in the database, replacing any existing element
3453    with the same key
3454
3455    return 0 on success, -1 on failure
3456 */
3457 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3458 {
3459         struct list_struct rec;
3460         u32 hash;
3461         tdb_off_t rec_ptr;
3462         char *p = NULL;
3463         int ret = -1;
3464
3465         if (tdb->read_only || tdb->traverse_read) {
3466                 tdb->ecode = TDB_ERR_RDONLY;
3467                 return -1;
3468         }
3469
3470         /* find which hash bucket it is in */
3471         hash = tdb->hash_fn(&key);
3472         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3473                 return -1;
3474
3475         /* check for it existing, on insert. */
3476         if (flag == TDB_INSERT) {
3477                 if (tdb_exists_hash(tdb, key, hash)) {
3478                         tdb->ecode = TDB_ERR_EXISTS;
3479                         goto fail;
3480                 }
3481         } else {
3482                 /* first try in-place update, on modify or replace. */
3483                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3484                         goto done;
3485                 }
3486                 if (tdb->ecode == TDB_ERR_NOEXIST &&
3487                     flag == TDB_MODIFY) {
3488                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
3489                          we should fail the store */
3490                         goto fail;
3491                 }
3492         }
3493         /* reset the error code potentially set by the tdb_update() */
3494         tdb->ecode = TDB_SUCCESS;
3495
3496         /* delete any existing record - if it doesn't exist we don't
3497            care.  Doing this first reduces fragmentation, and avoids
3498            coalescing with `allocated' block before it's updated. */
3499         if (flag != TDB_INSERT)
3500                 tdb_delete_hash(tdb, key, hash);
3501
3502         /* Copy key+value *before* allocating free space in case malloc
3503            fails and we are left with a dead spot in the tdb. */
3504
3505         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3506                 tdb->ecode = TDB_ERR_OOM;
3507                 goto fail;
3508         }
3509
3510         memcpy(p, key.dptr, key.dsize);
3511         if (dbuf.dsize)
3512                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3513
3514         if (tdb->max_dead_records != 0) {
3515                 /*
3516                  * Allow for some dead records per hash chain, look if we can
3517                  * find one that can hold the new record. We need enough space
3518                  * for key, data and tailer. If we find one, we don't have to
3519                  * consult the central freelist.
3520                  */
3521                 rec_ptr = tdb_find_dead(
3522                         tdb, hash, &rec,
3523                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3524
3525                 if (rec_ptr != 0) {
3526                         rec.key_len = key.dsize;
3527                         rec.data_len = dbuf.dsize;
3528                         rec.full_hash = hash;
3529                         rec.magic = TDB_MAGIC;
3530                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3531                             || tdb->methods->tdb_write(
3532                                     tdb, rec_ptr + sizeof(rec),
3533                                     p, key.dsize + dbuf.dsize) == -1) {
3534                                 goto fail;
3535                         }
3536                         goto done;
3537                 }
3538         }
3539
3540         /*
3541          * We have to allocate some space from the freelist, so this means we
3542          * have to lock it. Use the chance to purge all the DEAD records from
3543          * the hash chain under the freelist lock.
3544          */
3545
3546         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3547                 goto fail;
3548         }
3549
3550         if ((tdb->max_dead_records != 0)
3551             && (tdb_purge_dead(tdb, hash) == -1)) {
3552                 tdb_unlock(tdb, -1, F_WRLCK);
3553                 goto fail;
3554         }
3555
3556         /* we have to allocate some space */
3557         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3558
3559         tdb_unlock(tdb, -1, F_WRLCK);
3560
3561         if (rec_ptr == 0) {
3562                 goto fail;
3563         }
3564
3565         /* Read hash top into next ptr */
3566         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3567                 goto fail;
3568
3569         rec.key_len = key.dsize;
3570         rec.data_len = dbuf.dsize;
3571         rec.full_hash = hash;
3572         rec.magic = TDB_MAGIC;
3573
3574         /* write out and point the top of the hash chain at it */
3575         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3576             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3577             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3578                 /* Need to tdb_unallocate() here */
3579                 goto fail;
3580         }
3581
3582  done:
3583         ret = 0;
3584  fail:
3585         if (ret == 0) {
3586                 tdb_increment_seqnum(tdb);
3587         }
3588
3589         SAFE_FREE(p);
3590         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3591         return ret;
3592 }
3593
3594
3595 /* Append to an entry. Create if not exist. */
3596 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3597 {
3598         u32 hash;
3599         TDB_DATA dbuf;
3600         int ret = -1;
3601
3602         /* find which hash bucket it is in */
3603         hash = tdb->hash_fn(&key);
3604         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3605                 return -1;
3606
3607         dbuf = tdb_fetch(tdb, key);
3608
3609         if (dbuf.dptr == NULL) {
3610                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3611         } else {
3612                 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3613                                                      dbuf.dsize + new_dbuf.dsize);
3614                 if (new_dptr == NULL) {
3615                         free(dbuf.dptr);
3616                 }
3617                 dbuf.dptr = new_dptr;
3618         }
3619
3620         if (dbuf.dptr == NULL) {
3621                 tdb->ecode = TDB_ERR_OOM;
3622                 goto failed;
3623         }
3624
3625         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3626         dbuf.dsize += new_dbuf.dsize;
3627
3628         ret = tdb_store(tdb, key, dbuf, 0);
3629
3630 failed:
3631         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3632         SAFE_FREE(dbuf.dptr);
3633         return ret;
3634 }
3635
3636
3637 /*
3638   return the name of the current tdb file
3639   useful for external logging functions
3640 */
3641 const char *tdb_name(struct tdb_context *tdb)
3642 {
3643         return tdb->name;
3644 }
3645
3646 /*
3647   return the underlying file descriptor being used by tdb, or -1
3648   useful for external routines that want to check the device/inode
3649   of the fd
3650 */
3651 int tdb_fd(struct tdb_context *tdb)
3652 {
3653         return tdb->fd;
3654 }
3655
3656 /*
3657   return the current logging function
3658   useful for external tdb routines that wish to log tdb errors
3659 */
3660 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3661 {
3662         return tdb->log.log_fn;
3663 }
3664
3665
3666 /*
3667   get the tdb sequence number. Only makes sense if the writers opened
3668   with TDB_SEQNUM set. Note that this sequence number will wrap quite
3669   quickly, so it should only be used for a 'has something changed'
3670   test, not for code that relies on the count of the number of changes
3671   made. If you want a counter then use a tdb record.
3672
3673   The aim of this sequence number is to allow for a very lightweight
3674   test of a possible tdb change.
3675 */
3676 int tdb_get_seqnum(struct tdb_context *tdb)
3677 {
3678         tdb_off_t seqnum=0;
3679
3680         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3681         return seqnum;
3682 }
3683
3684 int tdb_hash_size(struct tdb_context *tdb)
3685 {
3686         return tdb->header.hash_size;
3687 }
3688
3689 size_t tdb_map_size(struct tdb_context *tdb)
3690 {
3691         return tdb->map_size;
3692 }
3693
3694 int tdb_get_flags(struct tdb_context *tdb)
3695 {
3696         return tdb->flags;
3697 }
3698
3699
3700 /*
3701   enable sequence number handling on an open tdb
3702 */
3703 void tdb_enable_seqnum(struct tdb_context *tdb)
3704 {
3705         tdb->flags |= TDB_SEQNUM;
3706 }
3707
3708 /* file: open.c */
3709
3710 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3711 static struct tdb_context *tdbs = NULL;
3712
3713
3714 /* This is from a hash algorithm suggested by Rogier Wolff */
3715 static unsigned int default_tdb_hash(TDB_DATA *key)
3716 {
3717         u32 value;      /* Used to compute the hash value.  */
3718         u32   i;        /* Used to cycle through random values. */
3719
3720         /* Set the initial value from the key size. */
3721         for (value = 0, i=0; i < key->dsize; i++)
3722                 value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3723
3724         return value;
3725 }
3726
3727
3728 /* initialise a new database with a specified hash size */
3729 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3730 {
3731         struct tdb_header *newdb;
3732         int size, ret = -1;
3733
3734         /* We make it up in memory, then write it out if not internal */
3735         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3736         if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3737                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3738
3739         /* Fill in the header */
3740         newdb->version = TDB_VERSION;
3741         newdb->hash_size = hash_size;
3742         if (tdb->flags & TDB_INTERNAL) {
3743                 tdb->map_size = size;
3744                 tdb->map_ptr = (char *)newdb;
3745                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3746                 /* Convert the `ondisk' version if asked. */
3747                 CONVERT(*newdb);
3748                 return 0;
3749         }
3750         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3751                 goto fail;
3752
3753         if (ftruncate(tdb->fd, 0) == -1)
3754                 goto fail;
3755
3756         /* This creates an endian-converted header, as if read from disk */
3757         CONVERT(*newdb);
3758         memcpy(&tdb->header, newdb, sizeof(tdb->header));
3759         /* Don't endian-convert the magic food! */
3760         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3761         if (write(tdb->fd, newdb, size) != size) {
3762                 ret = -1;
3763         } else {
3764                 ret = 0;
3765         }
3766
3767   fail:
3768         SAFE_FREE(newdb);
3769         return ret;
3770 }
3771
3772
3773
3774 static int tdb_already_open(dev_t device,
3775                             ino_t ino)
3776 {
3777         struct tdb_context *i;
3778
3779         for (i = tdbs; i; i = i->next) {
3780                 if (i->device == device && i->inode == ino) {
3781                         return 1;
3782                 }
3783         }
3784
3785         return 0;
3786 }
3787
3788 /* open the database, creating it if necessary
3789
3790    The open_flags and mode are passed straight to the open call on the
3791    database file. A flags value of O_WRONLY is invalid. The hash size
3792    is advisory, use zero for a default value.
3793
3794    Return is NULL on error, in which case errno is also set.  Don't
3795    try to call tdb_error or tdb_errname, just do strerror(errno).
3796
3797    @param name may be NULL for internal databases. */
3798 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3799                       int open_flags, mode_t mode)
3800 {
3801         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3802 }
3803
3804 /* a default logging function */
3805 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3806 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3807 {
3808 }
3809
3810
3811 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3812                                 int open_flags, mode_t mode,
3813                                 const struct tdb_logging_context *log_ctx,
3814                                 tdb_hash_func hash_fn)
3815 {
3816         struct tdb_context *tdb;
3817         struct stat st;
3818         int rev = 0, locked = 0;
3819         unsigned char *vp;
3820         u32 vertest;
3821
3822         if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3823                 /* Can't log this */
3824                 errno = ENOMEM;
3825                 goto fail;
3826         }
3827         tdb_io_init(tdb);
3828         tdb->fd = -1;
3829         tdb->name = NULL;
3830         tdb->map_ptr = NULL;
3831         tdb->flags = tdb_flags;
3832         tdb->open_flags = open_flags;
3833         if (log_ctx) {
3834                 tdb->log = *log_ctx;
3835         } else {
3836                 tdb->log.log_fn = null_log_fn;
3837                 tdb->log.log_private = NULL;
3838         }
3839         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3840
3841         /* cache the page size */
3842         tdb->page_size = sysconf(_SC_PAGESIZE);
3843         if (tdb->page_size <= 0) {
3844                 tdb->page_size = 0x2000;
3845         }
3846
3847         if ((open_flags & O_ACCMODE) == O_WRONLY) {
3848                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3849                          name));
3850                 errno = EINVAL;
3851                 goto fail;
3852         }
3853
3854         if (hash_size == 0)
3855                 hash_size = DEFAULT_HASH_SIZE;
3856         if ((open_flags & O_ACCMODE) == O_RDONLY) {
3857                 tdb->read_only = 1;
3858                 /* read only databases don't do locking or clear if first */
3859                 tdb->flags |= TDB_NOLOCK;
3860                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3861         }
3862
3863         /* internal databases don't mmap or lock, and start off cleared */
3864         if (tdb->flags & TDB_INTERNAL) {
3865                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3866                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3867                 if (tdb_new_database(tdb, hash_size) != 0) {
3868                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3869                         goto fail;
3870                 }
3871                 goto internal;
3872         }
3873
3874         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3875                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3876                          name, strerror(errno)));
3877                 goto fail;      /* errno set by open(2) */
3878         }
3879
3880         /* ensure there is only one process initialising at once */
3881         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3882                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3883                          name, strerror(errno)));
3884                 goto fail;      /* errno set by tdb_brlock */
3885         }
3886
3887         /* we need to zero database if we are the only one with it open */
3888         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3889             (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3890                 open_flags |= O_CREAT;
3891                 if (ftruncate(tdb->fd, 0) == -1) {
3892                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3893                                  "failed to truncate %s: %s\n",
3894                                  name, strerror(errno)));
3895                         goto fail; /* errno set by ftruncate */
3896                 }
3897         }
3898
3899         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3900             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3901             || (tdb->header.version != TDB_VERSION
3902                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3903                 /* its not a valid database - possibly initialise it */
3904                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3905                         errno = EIO; /* ie bad format or something */
3906                         goto fail;
3907                 }
3908                 rev = (tdb->flags & TDB_CONVERT);
3909         }
3910         vp = (unsigned char *)&tdb->header.version;
3911         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3912                   (((u32)vp[2]) << 8) | (u32)vp[3];
3913         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3914         if (!rev)
3915                 tdb->flags &= ~TDB_CONVERT;
3916         else {
3917                 tdb->flags |= TDB_CONVERT;
3918                 tdb_convert(&tdb->header, sizeof(tdb->header));
3919         }
3920         if (fstat(tdb->fd, &st) == -1)
3921                 goto fail;
3922
3923         if (tdb->header.rwlocks != 0) {
3924                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3925                 goto fail;
3926         }
3927
3928         /* Is it already in the open list?  If so, fail. */
3929         if (tdb_already_open(st.st_dev, st.st_ino)) {
3930                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3931                          "%s (%d,%d) is already open in this process\n",
3932                          name, (int)st.st_dev, (int)st.st_ino));
3933                 errno = EBUSY;
3934                 goto fail;
3935         }
3936
3937         if (!(tdb->name = (char *)strdup(name))) {
3938                 errno = ENOMEM;
3939                 goto fail;
3940         }
3941
3942         tdb->map_size = st.st_size;
3943         tdb->device = st.st_dev;
3944         tdb->inode = st.st_ino;
3945         tdb->max_dead_records = 0;
3946         tdb_mmap(tdb);
3947         if (locked) {
3948                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3949                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3950                                  "failed to take ACTIVE_LOCK on %s: %s\n",
3951                                  name, strerror(errno)));
3952                         goto fail;
3953                 }
3954
3955         }
3956
3957         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3958            we didn't get the initial exclusive lock as we need to let all other
3959            users know we're using it. */
3960
3961         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3962                 /* leave this lock in place to indicate it's in use */
3963                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3964                         goto fail;
3965         }
3966
3967         /* if needed, run recovery */
3968         if (tdb_transaction_recover(tdb) == -1) {
3969                 goto fail;
3970         }
3971
3972  internal:
3973         /* Internal (memory-only) databases skip all the code above to
3974          * do with disk files, and resume here by releasing their
3975          * global lock and hooking into the active list. */
3976         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3977                 goto fail;
3978         tdb->next = tdbs;
3979         tdbs = tdb;
3980         return tdb;
3981
3982  fail:
3983         { int save_errno = errno;
3984
3985         if (!tdb)
3986                 return NULL;
3987
3988         if (tdb->map_ptr) {
3989                 if (tdb->flags & TDB_INTERNAL)
3990                         SAFE_FREE(tdb->map_ptr);
3991                 else
3992                         tdb_munmap(tdb);
3993         }
3994         SAFE_FREE(tdb->name);
3995         if (tdb->fd != -1)
3996                 if (close(tdb->fd) != 0)
3997                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3998         SAFE_FREE(tdb);
3999         errno = save_errno;
4000         return NULL;
4001         }
4002 }
4003
4004 /*
4005  * Set the maximum number of dead records per hash chain
4006  */
4007
4008 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4009 {
4010         tdb->max_dead_records = max_dead;
4011 }
4012
4013 /**
4014  * Close a database.
4015  *
4016  * @returns -1 for error; 0 for success.
4017  **/
4018 int tdb_close(struct tdb_context *tdb)
4019 {
4020         struct tdb_context **i;
4021         int ret = 0;
4022
4023         if (tdb->transaction) {
4024                 tdb_transaction_cancel(tdb);
4025         }
4026
4027         if (tdb->map_ptr) {
4028                 if (tdb->flags & TDB_INTERNAL)
4029                         SAFE_FREE(tdb->map_ptr);
4030                 else
4031                         tdb_munmap(tdb);
4032         }
4033         SAFE_FREE(tdb->name);
4034         if (tdb->fd != -1)
4035                 ret = close(tdb->fd);
4036         SAFE_FREE(tdb->lockrecs);
4037
4038         /* Remove from contexts list */
4039         for (i = &tdbs; *i; i = &(*i)->next) {
4040                 if (*i == tdb) {
4041                         *i = tdb->next;
4042                         break;
4043                 }
4044         }
4045
4046         memset(tdb, 0, sizeof(*tdb));
4047         SAFE_FREE(tdb);
4048
4049         return ret;
4050 }
4051
4052 /* register a loging function */
4053 void tdb_set_logging_function(struct tdb_context *tdb,
4054                               const struct tdb_logging_context *log_ctx)
4055 {
4056         tdb->log = *log_ctx;
4057 }
4058
4059 void *tdb_get_logging_private(struct tdb_context *tdb)
4060 {
4061         return tdb->log.log_private;
4062 }
4063
4064 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
4065    seek pointer from our parent and to re-establish locks */
4066 int tdb_reopen(struct tdb_context *tdb)
4067 {
4068         struct stat st;
4069
4070         if (tdb->flags & TDB_INTERNAL) {
4071                 return 0; /* Nothing to do. */
4072         }
4073
4074         if (tdb->num_locks != 0 || tdb->global_lock.count) {
4075                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4076                 goto fail;
4077         }
4078
4079         if (tdb->transaction != 0) {
4080                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4081                 goto fail;
4082         }
4083
4084         if (tdb_munmap(tdb) != 0) {
4085                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4086                 goto fail;
4087         }
4088         if (close(tdb->fd) != 0)
4089                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4090         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4091         if (tdb->fd == -1) {
4092                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4093                 goto fail;
4094         }
4095         if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4096             (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4097                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4098                 goto fail;
4099         }
4100         if (fstat(tdb->fd, &st) != 0) {
4101                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4102                 goto fail;
4103         }
4104         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4105                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4106                 goto fail;
4107         }
4108         tdb_mmap(tdb);
4109
4110         return 0;
4111
4112 fail:
4113         tdb_close(tdb);
4114         return -1;
4115 }
4116
4117 /* reopen all tdb's */
4118 int tdb_reopen_all(int parent_longlived)
4119 {
4120         struct tdb_context *tdb;
4121
4122         for (tdb=tdbs; tdb; tdb = tdb->next) {
4123                 /*
4124                  * If the parent is longlived (ie. a
4125                  * parent daemon architecture), we know
4126                  * it will keep it's active lock on a
4127                  * tdb opened with CLEAR_IF_FIRST. Thus
4128                  * for child processes we don't have to
4129                  * add an active lock. This is essential
4130                  * to improve performance on systems that
4131                  * keep POSIX locks as a non-scalable data
4132                  * structure in the kernel.
4133                  */
4134                 if (parent_longlived) {
4135                         /* Ensure no clear-if-first. */
4136                         tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4137                 }
4138
4139                 if (tdb_reopen(tdb) != 0)
4140                         return -1;
4141         }
4142
4143         return 0;
4144 }