Whamcloud - gitweb
Add configure --disable-tdb which disables e2fsck's scratch_files feature
[tools/e2fsprogs.git] / lib / ext2fs / tdb.c
1 /*
2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3 Rev: 23590
4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5 */
6  /*
7    trivial database library - standalone version
8
9    Copyright (C) Andrew Tridgell              1999-2005
10    Copyright (C) Jeremy Allison               2000-2006
11    Copyright (C) Paul `Rusty' Russell         2000
12
13      ** NOTE! The following LGPL license applies to the tdb
14      ** library. This does NOT imply that all of Samba is released
15      ** under the LGPL
16
17    This library is free software; you can redistribute it and/or
18    modify it under the terms of the GNU Lesser General Public
19    License as published by the Free Software Foundation; either
20    version 2 of the License, or (at your option) any later version.
21
22    This library is distributed in the hope that it will be useful,
23    but WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25    Lesser General Public License for more details.
26
27    You should have received a copy of the GNU Lesser General Public
28    License along with this library; if not, write to the Free Software
29    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30 */
31
32 #ifdef CONFIG_STAND_ALONE
33 #define HAVE_MMAP
34 #define HAVE_STRDUP
35 #define HAVE_SYS_MMAN_H
36 #define HAVE_UTIME_H
37 #define HAVE_UTIME
38 #endif
39 #ifndef __FreeBSD__
40 #define _XOPEN_SOURCE 600
41 #endif
42
43 #include "config.h"
44 #include <unistd.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <stdarg.h>
48 #include <stddef.h>
49 #include <errno.h>
50 #include <string.h>
51 #ifdef HAVE_SYS_SELECT_H
52 #include <sys/select.h>
53 #endif
54 #include <sys/time.h>
55 #include <sys/types.h>
56 #include <time.h>
57 #ifdef HAVE_UTIME_H
58 #include <utime.h>
59 #endif
60 #include <sys/stat.h>
61 #include <sys/file.h>
62 #include <fcntl.h>
63
64 #ifdef HAVE_SYS_MMAN_H
65 #include <sys/mman.h>
66 #endif
67
68 #ifndef MAP_FILE
69 #define MAP_FILE 0
70 #endif
71
72 #ifndef MAP_FAILED
73 #define MAP_FAILED ((void *)-1)
74 #endif
75
76 #ifndef HAVE_STRDUP
77 #define strdup rep_strdup
78 static char *rep_strdup(const char *s)
79 {
80         char *ret;
81         int length;
82         if (!s)
83                 return NULL;
84
85         if (!length)
86                 length = strlen(s);
87
88         ret = malloc(length + 1);
89         if (ret) {
90                 strncpy(ret, s, length);
91                 ret[length] = '\0';
92         }
93         return ret;
94 }
95 #endif
96
97 #ifndef PRINTF_ATTRIBUTE
98 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
99 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
100  * the parameter containing the format, and a2 the index of the first
101  * argument. Note that some gcc 2.x versions don't handle this
102  * properly **/
103 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
104 #else
105 #define PRINTF_ATTRIBUTE(a1, a2)
106 #endif
107 #endif
108
109 typedef int bool;
110
111 #include "tdb.h"
112
113 static TDB_DATA tdb_null;
114
115 #ifndef u32
116 #define u32 unsigned
117 #endif
118
119 typedef u32 tdb_len_t;
120 typedef u32 tdb_off_t;
121
122 #ifndef offsetof
123 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
124 #endif
125
126 #define TDB_MAGIC_FOOD "TDB file\n"
127 #define TDB_VERSION (0x26011967 + 6)
128 #define TDB_MAGIC (0x26011999U)
129 #define TDB_FREE_MAGIC (~TDB_MAGIC)
130 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
131 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
132 #define TDB_ALIGNMENT 4
133 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
134 #define DEFAULT_HASH_SIZE 131
135 #define FREELIST_TOP (sizeof(struct tdb_header))
136 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
137 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
138 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
139 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
140 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
141 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
142 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
143 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
144 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
145 #define TDB_PAD_BYTE 0x42
146 #define TDB_PAD_U32  0x42424242
147
148 /* NB assumes there is a local variable called "tdb" that is the
149  * current context, also takes doubly-parenthesized print-style
150  * argument. */
151 #define TDB_LOG(x) tdb->log.log_fn x
152
153 /* lock offsets */
154 #define GLOBAL_LOCK      0
155 #define ACTIVE_LOCK      4
156 #define TRANSACTION_LOCK 8
157
158 /* free memory if the pointer is valid and zero the pointer */
159 #ifndef SAFE_FREE
160 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
161 #endif
162
163 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
164
165 #define DOCONV() (tdb->flags & TDB_CONVERT)
166 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
167
168
169 /* the body of the database is made of one list_struct for the free space
170    plus a separate data list for each hash value */
171 struct list_struct {
172         tdb_off_t next; /* offset of the next record in the list */
173         tdb_len_t rec_len; /* total byte length of record */
174         tdb_len_t key_len; /* byte length of key */
175         tdb_len_t data_len; /* byte length of data */
176         u32 full_hash; /* the full 32 bit hash of the key */
177         u32 magic;   /* try to catch errors */
178         /* the following union is implied:
179                 union {
180                         char record[rec_len];
181                         struct {
182                                 char key[key_len];
183                                 char data[data_len];
184                         }
185                         u32 totalsize; (tailer)
186                 }
187         */
188 };
189
190
191 /* this is stored at the front of every database */
192 struct tdb_header {
193         char magic_food[32]; /* for /etc/magic */
194         u32 version; /* version of the code */
195         u32 hash_size; /* number of hash entries */
196         tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
197         tdb_off_t recovery_start; /* offset of transaction recovery region */
198         tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
199         tdb_off_t reserved[29];
200 };
201
202 struct tdb_lock_type {
203         int list;
204         u32 count;
205         u32 ltype;
206 };
207
208 struct tdb_traverse_lock {
209         struct tdb_traverse_lock *next;
210         u32 off;
211         u32 hash;
212         int lock_rw;
213 };
214
215
216 struct tdb_methods {
217         int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
218         int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
219         void (*next_hash_chain)(struct tdb_context *, u32 *);
220         int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
221         int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
222         int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
223 };
224
225 struct tdb_context {
226         char *name; /* the name of the database */
227         void *map_ptr; /* where it is currently mapped */
228         int fd; /* open file descriptor for the database */
229         tdb_len_t map_size; /* how much space has been mapped */
230         int read_only; /* opened read-only */
231         int traverse_read; /* read-only traversal */
232         struct tdb_lock_type global_lock;
233         int num_lockrecs;
234         struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
235         enum TDB_ERROR ecode; /* error code for last tdb error */
236         struct tdb_header header; /* a cached copy of the header */
237         u32 flags; /* the flags passed to tdb_open */
238         struct tdb_traverse_lock travlocks; /* current traversal locks */
239         struct tdb_context *next; /* all tdbs to avoid multiple opens */
240         dev_t device;   /* uniquely identifies this tdb */
241         ino_t inode;    /* uniquely identifies this tdb */
242         struct tdb_logging_context log;
243         unsigned int (*hash_fn)(TDB_DATA *key);
244         int open_flags; /* flags used in the open - needed by reopen */
245         unsigned int num_locks; /* number of chain locks held */
246         const struct tdb_methods *methods;
247         struct tdb_transaction *transaction;
248         int page_size;
249         int max_dead_records;
250         bool have_transaction_lock;
251         tdb_len_t real_map_size; /* how much space has been mapped */
252 };
253
254
255 /*
256   internal prototypes
257 */
258 static int tdb_munmap(struct tdb_context *tdb);
259 static void tdb_mmap(struct tdb_context *tdb);
260 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
261 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
262 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
263 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
264 static int tdb_transaction_unlock(struct tdb_context *tdb);
265 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
266 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
267 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
268 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
269 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
270 static void *tdb_convert(void *buf, u32 size);
271 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
272 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
273 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
274 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
275 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
276 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
277 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
278 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
279 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
280 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
281 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
282                    tdb_off_t offset, tdb_len_t len,
283                    int (*parser)(TDB_DATA key, TDB_DATA data,
284                                  void *private_data),
285                    void *private_data);
286 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
287                            struct list_struct *rec);
288 static void tdb_io_init(struct tdb_context *tdb);
289 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
290 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
291                       struct list_struct *rec);
292
293
294 /* file: error.c */
295
296 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
297 {
298         return tdb->ecode;
299 }
300
301 static struct tdb_errname {
302         enum TDB_ERROR ecode; const char *estring;
303 } emap[] = { {TDB_SUCCESS, "Success"},
304              {TDB_ERR_CORRUPT, "Corrupt database"},
305              {TDB_ERR_IO, "IO Error"},
306              {TDB_ERR_LOCK, "Locking error"},
307              {TDB_ERR_OOM, "Out of memory"},
308              {TDB_ERR_EXISTS, "Record exists"},
309              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
310              {TDB_ERR_EINVAL, "Invalid parameter"},
311              {TDB_ERR_NOEXIST, "Record does not exist"},
312              {TDB_ERR_RDONLY, "write not permitted"} };
313
314 /* Error string for the last tdb error */
315 const char *tdb_errorstr(struct tdb_context *tdb)
316 {
317         u32 i;
318         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
319                 if (tdb->ecode == emap[i].ecode)
320                         return emap[i].estring;
321         return "Invalid error code";
322 }
323
324 /* file: lock.c */
325
326 #define TDB_MARK_LOCK 0x80000000
327
328 /* a byte range locking function - return 0 on success
329    this functions locks/unlocks 1 byte at the specified offset.
330
331    On error, errno is also set so that errors are passed back properly
332    through tdb_open().
333
334    note that a len of zero means lock to end of file
335 */
336 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
337                int rw_type, int lck_type, int probe, size_t len)
338 {
339         struct flock fl;
340         int ret;
341
342         if (tdb->flags & TDB_NOLOCK) {
343                 return 0;
344         }
345
346         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
347                 tdb->ecode = TDB_ERR_RDONLY;
348                 return -1;
349         }
350
351         fl.l_type = rw_type;
352         fl.l_whence = SEEK_SET;
353         fl.l_start = offset;
354         fl.l_len = len;
355         fl.l_pid = 0;
356
357         do {
358                 ret = fcntl(tdb->fd,lck_type,&fl);
359         } while (ret == -1 && errno == EINTR);
360
361         if (ret == -1) {
362                 /* Generic lock error. errno set by fcntl.
363                  * EAGAIN is an expected return from non-blocking
364                  * locks. */
365                 if (!probe && lck_type != F_SETLK) {
366                         /* Ensure error code is set for log fun to examine. */
367                         tdb->ecode = TDB_ERR_LOCK;
368                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
369                                  tdb->fd, offset, rw_type, lck_type, (int)len));
370                 }
371                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
372         }
373         return 0;
374 }
375
376
377 /*
378   upgrade a read lock to a write lock. This needs to be handled in a
379   special way as some OSes (such as solaris) have too conservative
380   deadlock detection and claim a deadlock when progress can be
381   made. For those OSes we may loop for a while.
382 */
383 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
384 {
385         int count = 1000;
386         while (count--) {
387                 struct timeval tv;
388                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
389                         return 0;
390                 }
391                 if (errno != EDEADLK) {
392                         break;
393                 }
394                 /* sleep for as short a time as we can - more portable than usleep() */
395                 tv.tv_sec = 0;
396                 tv.tv_usec = 1;
397                 select(0, NULL, NULL, NULL, &tv);
398         }
399         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
400         return -1;
401 }
402
403
404 /* lock a list in the database. list -1 is the alloc list */
405 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
406 {
407         struct tdb_lock_type *new_lck;
408         int i;
409         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
410
411         ltype &= ~TDB_MARK_LOCK;
412
413         /* a global lock allows us to avoid per chain locks */
414         if (tdb->global_lock.count &&
415             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
416                 return 0;
417         }
418
419         if (tdb->global_lock.count) {
420                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
421         }
422
423         if (list < -1 || list >= (int)tdb->header.hash_size) {
424                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
425                            list, ltype));
426                 return -1;
427         }
428         if (tdb->flags & TDB_NOLOCK)
429                 return 0;
430
431         for (i=0; i<tdb->num_lockrecs; i++) {
432                 if (tdb->lockrecs[i].list == list) {
433                         if (tdb->lockrecs[i].count == 0) {
434                                 /*
435                                  * Can't happen, see tdb_unlock(). It should
436                                  * be an assert.
437                                  */
438                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
439                                          "lck->count == 0 for list %d", list));
440                         }
441                         /*
442                          * Just increment the in-memory struct, posix locks
443                          * don't stack.
444                          */
445                         tdb->lockrecs[i].count++;
446                         return 0;
447                 }
448         }
449
450         new_lck = (struct tdb_lock_type *)realloc(
451                 tdb->lockrecs,
452                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
453         if (new_lck == NULL) {
454                 errno = ENOMEM;
455                 return -1;
456         }
457         tdb->lockrecs = new_lck;
458
459         /* Since fcntl locks don't nest, we do a lock for the first one,
460            and simply bump the count for future ones */
461         if (!mark_lock &&
462             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
463                                      0, 1)) {
464                 return -1;
465         }
466
467         tdb->num_locks++;
468
469         tdb->lockrecs[tdb->num_lockrecs].list = list;
470         tdb->lockrecs[tdb->num_lockrecs].count = 1;
471         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
472         tdb->num_lockrecs += 1;
473
474         return 0;
475 }
476
477 /* lock a list in the database. list -1 is the alloc list */
478 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
479 {
480         int ret;
481         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
482         if (ret) {
483                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
484                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
485         }
486         return ret;
487 }
488
489 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
490 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
491 {
492         return _tdb_lock(tdb, list, ltype, F_SETLK);
493 }
494
495
496 /* unlock the database: returns void because it's too late for errors. */
497         /* changed to return int it may be interesting to know there
498            has been an error  --simo */
499 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
500 {
501         int ret = -1;
502         int i;
503         struct tdb_lock_type *lck = NULL;
504         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
505
506         ltype &= ~TDB_MARK_LOCK;
507
508         /* a global lock allows us to avoid per chain locks */
509         if (tdb->global_lock.count &&
510             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
511                 return 0;
512         }
513
514         if (tdb->global_lock.count) {
515                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
516         }
517
518         if (tdb->flags & TDB_NOLOCK)
519                 return 0;
520
521         /* Sanity checks */
522         if (list < -1 || list >= (int)tdb->header.hash_size) {
523                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
524                 return ret;
525         }
526
527         for (i=0; i<tdb->num_lockrecs; i++) {
528                 if (tdb->lockrecs[i].list == list) {
529                         lck = &tdb->lockrecs[i];
530                         break;
531                 }
532         }
533
534         if ((lck == NULL) || (lck->count == 0)) {
535                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
536                 return -1;
537         }
538
539         if (lck->count > 1) {
540                 lck->count--;
541                 return 0;
542         }
543
544         /*
545          * This lock has count==1 left, so we need to unlock it in the
546          * kernel. We don't bother with decrementing the in-memory array
547          * element, we're about to overwrite it with the last array element
548          * anyway.
549          */
550
551         if (mark_lock) {
552                 ret = 0;
553         } else {
554                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
555                                                F_SETLKW, 0, 1);
556         }
557         tdb->num_locks--;
558
559         /*
560          * Shrink the array by overwriting the element just unlocked with the
561          * last array element.
562          */
563
564         if (tdb->num_lockrecs > 1) {
565                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
566         }
567         tdb->num_lockrecs -= 1;
568
569         /*
570          * We don't bother with realloc when the array shrinks, but if we have
571          * a completely idle tdb we should get rid of the locked array.
572          */
573
574         if (tdb->num_lockrecs == 0) {
575                 SAFE_FREE(tdb->lockrecs);
576         }
577
578         if (ret)
579                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
580         return ret;
581 }
582
583 /*
584   get the transaction lock
585  */
586 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
587 {
588         if (tdb->have_transaction_lock || tdb->global_lock.count) {
589                 return 0;
590         }
591         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
592                                      F_SETLKW, 0, 1) == -1) {
593                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
594                 tdb->ecode = TDB_ERR_LOCK;
595                 return -1;
596         }
597         tdb->have_transaction_lock = 1;
598         return 0;
599 }
600
601 /*
602   release the transaction lock
603  */
604 int tdb_transaction_unlock(struct tdb_context *tdb)
605 {
606         int ret;
607         if (!tdb->have_transaction_lock) {
608                 return 0;
609         }
610         ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
611         if (ret == 0) {
612                 tdb->have_transaction_lock = 0;
613         }
614         return ret;
615 }
616
617
618
619
620 /* lock/unlock entire database */
621 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
622 {
623         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
624
625         ltype &= ~TDB_MARK_LOCK;
626
627         /* There are no locks on read-only dbs */
628         if (tdb->read_only || tdb->traverse_read)
629                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
630
631         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
632                 tdb->global_lock.count++;
633                 return 0;
634         }
635
636         if (tdb->global_lock.count) {
637                 /* a global lock of a different type exists */
638                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
639         }
640
641         if (tdb->num_locks != 0) {
642                 /* can't combine global and chain locks */
643                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
644         }
645
646         if (!mark_lock &&
647             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
648                                      0, 4*tdb->header.hash_size)) {
649                 if (op == F_SETLKW) {
650                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
651                 }
652                 return -1;
653         }
654
655         tdb->global_lock.count = 1;
656         tdb->global_lock.ltype = ltype;
657
658         return 0;
659 }
660
661
662
663 /* unlock entire db */
664 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
665 {
666         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
667
668         ltype &= ~TDB_MARK_LOCK;
669
670         /* There are no locks on read-only dbs */
671         if (tdb->read_only || tdb->traverse_read) {
672                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
673         }
674
675         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
676                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
677         }
678
679         if (tdb->global_lock.count > 1) {
680                 tdb->global_lock.count--;
681                 return 0;
682         }
683
684         if (!mark_lock &&
685             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
686                                      0, 4*tdb->header.hash_size)) {
687                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
688                 return -1;
689         }
690
691         tdb->global_lock.count = 0;
692         tdb->global_lock.ltype = 0;
693
694         return 0;
695 }
696
697 /* lock entire database with write lock */
698 int tdb_lockall(struct tdb_context *tdb)
699 {
700         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
701 }
702
703 /* lock entire database with write lock - mark only */
704 int tdb_lockall_mark(struct tdb_context *tdb)
705 {
706         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
707 }
708
709 /* unlock entire database with write lock - unmark only */
710 int tdb_lockall_unmark(struct tdb_context *tdb)
711 {
712         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
713 }
714
715 /* lock entire database with write lock - nonblocking varient */
716 int tdb_lockall_nonblock(struct tdb_context *tdb)
717 {
718         return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
719 }
720
721 /* unlock entire database with write lock */
722 int tdb_unlockall(struct tdb_context *tdb)
723 {
724         return _tdb_unlockall(tdb, F_WRLCK);
725 }
726
727 /* lock entire database with read lock */
728 int tdb_lockall_read(struct tdb_context *tdb)
729 {
730         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
731 }
732
733 /* lock entire database with read lock - nonblock varient */
734 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
735 {
736         return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
737 }
738
739 /* unlock entire database with read lock */
740 int tdb_unlockall_read(struct tdb_context *tdb)
741 {
742         return _tdb_unlockall(tdb, F_RDLCK);
743 }
744
745 /* lock/unlock one hash chain. This is meant to be used to reduce
746    contention - it cannot guarantee how many records will be locked */
747 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
748 {
749         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
750 }
751
752 /* lock/unlock one hash chain, non-blocking. This is meant to be used
753    to reduce contention - it cannot guarantee how many records will be
754    locked */
755 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
756 {
757         return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
758 }
759
760 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
761 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
762 {
763         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
764 }
765
766 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
767 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
768 {
769         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
770 }
771
772 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
773 {
774         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
775 }
776
777 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
778 {
779         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
780 }
781
782 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
783 {
784         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
785 }
786
787
788
789 /* record lock stops delete underneath */
790 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
791 {
792         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
793 }
794
795 /*
796   Write locks override our own fcntl readlocks, so check it here.
797   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
798   an error to fail to get the lock here.
799 */
800 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
801 {
802         struct tdb_traverse_lock *i;
803         for (i = &tdb->travlocks; i; i = i->next)
804                 if (i->off == off)
805                         return -1;
806         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
807 }
808
809 /*
810   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
811   an error to fail to get the lock here.
812 */
813 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
814 {
815         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
816 }
817
818 /* fcntl locks don't stack: avoid unlocking someone else's */
819 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
820 {
821         struct tdb_traverse_lock *i;
822         u32 count = 0;
823
824         if (off == 0)
825                 return 0;
826         for (i = &tdb->travlocks; i; i = i->next)
827                 if (i->off == off)
828                         count++;
829         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
830 }
831
832 /* file: io.c */
833
834 /* check for an out of bounds access - if it is out of bounds then
835    see if the database has been expanded by someone else and expand
836    if necessary
837    note that "len" is the minimum length needed for the db
838 */
839 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
840 {
841         struct stat st;
842         if (len <= tdb->map_size)
843                 return 0;
844         if (tdb->flags & TDB_INTERNAL) {
845                 if (!probe) {
846                         /* Ensure ecode is set for log fn. */
847                         tdb->ecode = TDB_ERR_IO;
848                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
849                                  (int)len, (int)tdb->map_size));
850                 }
851                 return TDB_ERRCODE(TDB_ERR_IO, -1);
852         }
853
854         if (fstat(tdb->fd, &st) == -1) {
855                 return TDB_ERRCODE(TDB_ERR_IO, -1);
856         }
857
858         if (st.st_size < (size_t)len) {
859                 if (!probe) {
860                         /* Ensure ecode is set for log fn. */
861                         tdb->ecode = TDB_ERR_IO;
862                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
863                                  (int)len, (int)st.st_size));
864                 }
865                 return TDB_ERRCODE(TDB_ERR_IO, -1);
866         }
867
868         /* Unmap, update size, remap */
869         if (tdb_munmap(tdb) == -1)
870                 return TDB_ERRCODE(TDB_ERR_IO, -1);
871         tdb->map_size = st.st_size;
872         tdb_mmap(tdb);
873         return 0;
874 }
875
876 /* write a lump of data at a specified offset */
877 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
878                      const void *buf, tdb_len_t len)
879 {
880         if (len == 0) {
881                 return 0;
882         }
883
884         if (tdb->read_only || tdb->traverse_read) {
885                 tdb->ecode = TDB_ERR_RDONLY;
886                 return -1;
887         }
888
889         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
890                 return -1;
891
892         if (tdb->map_ptr) {
893                 memcpy(off + (char *)tdb->map_ptr, buf, len);
894         } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
895                 /* Ensure ecode is set for log fn. */
896                 tdb->ecode = TDB_ERR_IO;
897                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
898                            off, len, strerror(errno)));
899                 return TDB_ERRCODE(TDB_ERR_IO, -1);
900         }
901         return 0;
902 }
903
904 /* Endian conversion: we only ever deal with 4 byte quantities */
905 void *tdb_convert(void *buf, u32 size)
906 {
907         u32 i, *p = (u32 *)buf;
908         for (i = 0; i < size / 4; i++)
909                 p[i] = TDB_BYTEREV(p[i]);
910         return buf;
911 }
912
913
914 /* read a lump of data at a specified offset, maybe convert */
915 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
916                     tdb_len_t len, int cv)
917 {
918         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
919                 return -1;
920         }
921
922         if (tdb->map_ptr) {
923                 memcpy(buf, off + (char *)tdb->map_ptr, len);
924         } else {
925                 ssize_t ret = pread(tdb->fd, buf, len, off);
926                 if (ret != (ssize_t)len) {
927                         /* Ensure ecode is set for log fn. */
928                         tdb->ecode = TDB_ERR_IO;
929                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
930                                  "len=%d ret=%d (%s) map_size=%d\n",
931                                  (int)off, (int)len, (int)ret, strerror(errno),
932                                  (int)tdb->map_size));
933                         return TDB_ERRCODE(TDB_ERR_IO, -1);
934                 }
935         }
936         if (cv) {
937                 tdb_convert(buf, len);
938         }
939         return 0;
940 }
941
942
943
944 /*
945   do an unlocked scan of the hash table heads to find the next non-zero head. The value
946   will then be confirmed with the lock held
947 */
948 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
949 {
950         u32 h = *chain;
951         if (tdb->map_ptr) {
952                 for (;h < tdb->header.hash_size;h++) {
953                         if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
954                                 break;
955                         }
956                 }
957         } else {
958                 u32 off=0;
959                 for (;h < tdb->header.hash_size;h++) {
960                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
961                                 break;
962                         }
963                 }
964         }
965         (*chain) = h;
966 }
967
968
969 int tdb_munmap(struct tdb_context *tdb)
970 {
971         if (tdb->flags & TDB_INTERNAL)
972                 return 0;
973
974 #ifdef HAVE_MMAP
975         if (tdb->map_ptr) {
976                 int ret = munmap(tdb->map_ptr, tdb->real_map_size);
977                 if (ret != 0)
978                         return ret;
979                 tdb->real_map_size = 0;
980         }
981 #endif
982         tdb->map_ptr = NULL;
983         return 0;
984 }
985
986 void tdb_mmap(struct tdb_context *tdb)
987 {
988         if (tdb->flags & TDB_INTERNAL)
989                 return;
990
991 #ifdef HAVE_MMAP
992         if (!(tdb->flags & TDB_NOMMAP)) {
993                 tdb->map_ptr = mmap(NULL, tdb->map_size,
994                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE),
995                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
996
997                 /*
998                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
999                  */
1000
1001                 if (tdb->map_ptr == MAP_FAILED) {
1002                         tdb->real_map_size = 0;
1003                         tdb->map_ptr = NULL;
1004                         TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1005                                  tdb->map_size, strerror(errno)));
1006                 }
1007                 tdb->real_map_size = tdb->map_size;
1008         } else {
1009                 tdb->map_ptr = NULL;
1010         }
1011 #else
1012         tdb->map_ptr = NULL;
1013 #endif
1014 }
1015
1016 /* expand a file.  we prefer to use ftruncate, as that is what posix
1017   says to use for mmap expansion */
1018 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1019 {
1020         char buf[1024];
1021
1022         if (tdb->read_only || tdb->traverse_read) {
1023                 tdb->ecode = TDB_ERR_RDONLY;
1024                 return -1;
1025         }
1026
1027         if (ftruncate(tdb->fd, size+addition) == -1) {
1028                 char b = 0;
1029                 if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1030                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1031                                  size+addition, strerror(errno)));
1032                         return -1;
1033                 }
1034         }
1035
1036         /* now fill the file with something. This ensures that the
1037            file isn't sparse, which would be very bad if we ran out of
1038            disk. This must be done with write, not via mmap */
1039         memset(buf, TDB_PAD_BYTE, sizeof(buf));
1040         while (addition) {
1041                 int n = addition>sizeof(buf)?sizeof(buf):addition;
1042                 int ret = pwrite(tdb->fd, buf, n, size);
1043                 if (ret != n) {
1044                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1045                                    n, strerror(errno)));
1046                         return -1;
1047                 }
1048                 addition -= n;
1049                 size += n;
1050         }
1051         return 0;
1052 }
1053
1054
1055 /* expand the database at least size bytes by expanding the underlying
1056    file and doing the mmap again if necessary */
1057 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1058 {
1059         struct list_struct rec;
1060         tdb_off_t offset;
1061
1062         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1063                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1064                 return -1;
1065         }
1066
1067         /* must know about any previous expansions by another process */
1068         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1069
1070         /* always make room for at least 10 more records, and round
1071            the database up to a multiple of the page size */
1072         size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1073
1074         if (!(tdb->flags & TDB_INTERNAL))
1075                 tdb_munmap(tdb);
1076
1077         /*
1078          * We must ensure the file is unmapped before doing this
1079          * to ensure consistency with systems like OpenBSD where
1080          * writes and mmaps are not consistent.
1081          */
1082
1083         /* expand the file itself */
1084         if (!(tdb->flags & TDB_INTERNAL)) {
1085                 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1086                         goto fail;
1087         }
1088
1089         tdb->map_size += size;
1090
1091         if (tdb->flags & TDB_INTERNAL) {
1092                 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1093                                                     tdb->map_size);
1094                 if (!new_map_ptr) {
1095                         tdb->map_size -= size;
1096                         goto fail;
1097                 }
1098                 tdb->map_ptr = new_map_ptr;
1099         } else {
1100                 /*
1101                  * We must ensure the file is remapped before adding the space
1102                  * to ensure consistency with systems like OpenBSD where
1103                  * writes and mmaps are not consistent.
1104                  */
1105
1106                 /* We're ok if the mmap fails as we'll fallback to read/write */
1107                 tdb_mmap(tdb);
1108         }
1109
1110         /* form a new freelist record */
1111         memset(&rec,'\0',sizeof(rec));
1112         rec.rec_len = size - sizeof(rec);
1113
1114         /* link it into the free list */
1115         offset = tdb->map_size - size;
1116         if (tdb_free(tdb, offset, &rec) == -1)
1117                 goto fail;
1118
1119         tdb_unlock(tdb, -1, F_WRLCK);
1120         return 0;
1121  fail:
1122         tdb_unlock(tdb, -1, F_WRLCK);
1123         return -1;
1124 }
1125
1126 /* read/write a tdb_off_t */
1127 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1128 {
1129         return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1130 }
1131
1132 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1133 {
1134         tdb_off_t off = *d;
1135         return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1136 }
1137
1138
1139 /* read a lump of data, allocating the space for it */
1140 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1141 {
1142         unsigned char *buf;
1143
1144         /* some systems don't like zero length malloc */
1145         if (len == 0) {
1146                 len = 1;
1147         }
1148
1149         if (!(buf = (unsigned char *)malloc(len))) {
1150                 /* Ensure ecode is set for log fn. */
1151                 tdb->ecode = TDB_ERR_OOM;
1152                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1153                            len, strerror(errno)));
1154                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1155         }
1156         if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1157                 SAFE_FREE(buf);
1158                 return NULL;
1159         }
1160         return buf;
1161 }
1162
1163 /* Give a piece of tdb data to a parser */
1164
1165 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1166                    tdb_off_t offset, tdb_len_t len,
1167                    int (*parser)(TDB_DATA key, TDB_DATA data,
1168                                  void *private_data),
1169                    void *private_data)
1170 {
1171         TDB_DATA data;
1172         int result;
1173
1174         data.dsize = len;
1175
1176         if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1177                 /*
1178                  * Optimize by avoiding the malloc/memcpy/free, point the
1179                  * parser directly at the mmap area.
1180                  */
1181                 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1182                         return -1;
1183                 }
1184                 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1185                 return parser(key, data, private_data);
1186         }
1187
1188         if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1189                 return -1;
1190         }
1191
1192         result = parser(key, data, private_data);
1193         free(data.dptr);
1194         return result;
1195 }
1196
1197 /* read/write a record */
1198 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1199 {
1200         if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1201                 return -1;
1202         if (TDB_BAD_MAGIC(rec)) {
1203                 /* Ensure ecode is set for log fn. */
1204                 tdb->ecode = TDB_ERR_CORRUPT;
1205                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1206                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1207         }
1208         return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1209 }
1210
1211 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1212 {
1213         struct list_struct r = *rec;
1214         return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1215 }
1216
1217 static const struct tdb_methods io_methods = {
1218         tdb_read,
1219         tdb_write,
1220         tdb_next_hash_chain,
1221         tdb_oob,
1222         tdb_expand_file,
1223         tdb_brlock
1224 };
1225
1226 /*
1227   initialise the default methods table
1228 */
1229 void tdb_io_init(struct tdb_context *tdb)
1230 {
1231         tdb->methods = &io_methods;
1232 }
1233
1234 /* file: transaction.c */
1235
1236 /*
1237   transaction design:
1238
1239   - only allow a single transaction at a time per database. This makes
1240     using the transaction API simpler, as otherwise the caller would
1241     have to cope with temporary failures in transactions that conflict
1242     with other current transactions
1243
1244   - keep the transaction recovery information in the same file as the
1245     database, using a special 'transaction recovery' record pointed at
1246     by the header. This removes the need for extra journal files as
1247     used by some other databases
1248
1249   - dynamically allocated the transaction recover record, re-using it
1250     for subsequent transactions. If a larger record is needed then
1251     tdb_free() the old record to place it on the normal tdb freelist
1252     before allocating the new record
1253
1254   - during transactions, keep a linked list of writes all that have
1255     been performed by intercepting all tdb_write() calls. The hooked
1256     transaction versions of tdb_read() and tdb_write() check this
1257     linked list and try to use the elements of the list in preference
1258     to the real database.
1259
1260   - don't allow any locks to be held when a transaction starts,
1261     otherwise we can end up with deadlock (plus lack of lock nesting
1262     in posix locks would mean the lock is lost)
1263
1264   - if the caller gains a lock during the transaction but doesn't
1265     release it then fail the commit
1266
1267   - allow for nested calls to tdb_transaction_start(), re-using the
1268     existing transaction record. If the inner transaction is cancelled
1269     then a subsequent commit will fail
1270
1271   - keep a mirrored copy of the tdb hash chain heads to allow for the
1272     fast hash heads scan on traverse, updating the mirrored copy in
1273     the transaction version of tdb_write
1274
1275   - allow callers to mix transaction and non-transaction use of tdb,
1276     although once a transaction is started then an exclusive lock is
1277     gained until the transaction is committed or cancelled
1278
1279   - the commit stategy involves first saving away all modified data
1280     into a linearised buffer in the transaction recovery area, then
1281     marking the transaction recovery area with a magic value to
1282     indicate a valid recovery record. In total 4 fsync/msync calls are
1283     needed per commit to prevent race conditions. It might be possible
1284     to reduce this to 3 or even 2 with some more work.
1285
1286   - check for a valid recovery record on open of the tdb, while the
1287     global lock is held. Automatically recover from the transaction
1288     recovery area if needed, then continue with the open as
1289     usual. This allows for smooth crash recovery with no administrator
1290     intervention.
1291
1292   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1293     still available, but no transaction recovery area is used and no
1294     fsync/msync calls are made.
1295
1296 */
1297
1298 struct tdb_transaction_el {
1299         struct tdb_transaction_el *next, *prev;
1300         tdb_off_t offset;
1301         tdb_len_t length;
1302         unsigned char *data;
1303 };
1304
1305 /*
1306   hold the context of any current transaction
1307 */
1308 struct tdb_transaction {
1309         /* we keep a mirrored copy of the tdb hash heads here so
1310            tdb_next_hash_chain() can operate efficiently */
1311         u32 *hash_heads;
1312
1313         /* the original io methods - used to do IOs to the real db */
1314         const struct tdb_methods *io_methods;
1315
1316         /* the list of transaction elements. We use a doubly linked
1317            list with a last pointer to allow us to keep the list
1318            ordered, with first element at the front of the list. It
1319            needs to be doubly linked as the read/write traversals need
1320            to be backwards, while the commit needs to be forwards */
1321         struct tdb_transaction_el *elements, *elements_last;
1322
1323         /* non-zero when an internal transaction error has
1324            occurred. All write operations will then fail until the
1325            transaction is ended */
1326         int transaction_error;
1327
1328         /* when inside a transaction we need to keep track of any
1329            nested tdb_transaction_start() calls, as these are allowed,
1330            but don't create a new transaction */
1331         int nesting;
1332
1333         /* old file size before transaction */
1334         tdb_len_t old_map_size;
1335 };
1336
1337
1338 /*
1339   read while in a transaction. We need to check first if the data is in our list
1340   of transaction elements, then if not do a real read
1341 */
1342 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1343                             tdb_len_t len, int cv)
1344 {
1345         struct tdb_transaction_el *el;
1346
1347         /* we need to walk the list backwards to get the most recent data */
1348         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1349                 tdb_len_t partial;
1350
1351                 if (off+len <= el->offset) {
1352                         continue;
1353                 }
1354                 if (off >= el->offset + el->length) {
1355                         continue;
1356                 }
1357
1358                 /* an overlapping read - needs to be split into up to
1359                    2 reads and a memcpy */
1360                 if (off < el->offset) {
1361                         partial = el->offset - off;
1362                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1363                                 goto fail;
1364                         }
1365                         len -= partial;
1366                         off += partial;
1367                         buf = (void *)(partial + (char *)buf);
1368                 }
1369                 if (off + len <= el->offset + el->length) {
1370                         partial = len;
1371                 } else {
1372                         partial = el->offset + el->length - off;
1373                 }
1374                 memcpy(buf, el->data + (off - el->offset), partial);
1375                 if (cv) {
1376                         tdb_convert(buf, len);
1377                 }
1378                 len -= partial;
1379                 off += partial;
1380                 buf = (void *)(partial + (char *)buf);
1381
1382                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1383                         goto fail;
1384                 }
1385
1386                 return 0;
1387         }
1388
1389         /* its not in the transaction elements - do a real read */
1390         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1391
1392 fail:
1393         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1394         tdb->ecode = TDB_ERR_IO;
1395         tdb->transaction->transaction_error = 1;
1396         return -1;
1397 }
1398
1399
1400 /*
1401   write while in a transaction
1402 */
1403 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1404                              const void *buf, tdb_len_t len)
1405 {
1406         struct tdb_transaction_el *el, *best_el=NULL;
1407
1408         if (len == 0) {
1409                 return 0;
1410         }
1411
1412         /* if the write is to a hash head, then update the transaction
1413            hash heads */
1414         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1415             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1416                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1417                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1418         }
1419
1420         /* first see if we can replace an existing entry */
1421         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1422                 tdb_len_t partial;
1423
1424                 if (best_el == NULL && off == el->offset+el->length) {
1425                         best_el = el;
1426                 }
1427
1428                 if (off+len <= el->offset) {
1429                         continue;
1430                 }
1431                 if (off >= el->offset + el->length) {
1432                         continue;
1433                 }
1434
1435                 /* an overlapping write - needs to be split into up to
1436                    2 writes and a memcpy */
1437                 if (off < el->offset) {
1438                         partial = el->offset - off;
1439                         if (transaction_write(tdb, off, buf, partial) != 0) {
1440                                 goto fail;
1441                         }
1442                         len -= partial;
1443                         off += partial;
1444                         buf = (const void *)(partial + (const char *)buf);
1445                 }
1446                 if (off + len <= el->offset + el->length) {
1447                         partial = len;
1448                 } else {
1449                         partial = el->offset + el->length - off;
1450                 }
1451                 memcpy(el->data + (off - el->offset), buf, partial);
1452                 len -= partial;
1453                 off += partial;
1454                 buf = (const void *)(partial + (const char *)buf);
1455
1456                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1457                         goto fail;
1458                 }
1459
1460                 return 0;
1461         }
1462
1463         /* see if we can append the new entry to an existing entry */
1464         if (best_el && best_el->offset + best_el->length == off &&
1465             (off+len < tdb->transaction->old_map_size ||
1466              off > tdb->transaction->old_map_size)) {
1467                 unsigned char *data = best_el->data;
1468                 el = best_el;
1469                 el->data = (unsigned char *)realloc(el->data,
1470                                                     el->length + len);
1471                 if (el->data == NULL) {
1472                         tdb->ecode = TDB_ERR_OOM;
1473                         tdb->transaction->transaction_error = 1;
1474                         el->data = data;
1475                         return -1;
1476                 }
1477                 if (buf) {
1478                         memcpy(el->data + el->length, buf, len);
1479                 } else {
1480                         memset(el->data + el->length, TDB_PAD_BYTE, len);
1481                 }
1482                 el->length += len;
1483                 return 0;
1484         }
1485
1486         /* add a new entry at the end of the list */
1487         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1488         if (el == NULL) {
1489                 tdb->ecode = TDB_ERR_OOM;
1490                 tdb->transaction->transaction_error = 1;
1491                 return -1;
1492         }
1493         el->next = NULL;
1494         el->prev = tdb->transaction->elements_last;
1495         el->offset = off;
1496         el->length = len;
1497         el->data = (unsigned char *)malloc(len);
1498         if (el->data == NULL) {
1499                 free(el);
1500                 tdb->ecode = TDB_ERR_OOM;
1501                 tdb->transaction->transaction_error = 1;
1502                 return -1;
1503         }
1504         if (buf) {
1505                 memcpy(el->data, buf, len);
1506         } else {
1507                 memset(el->data, TDB_PAD_BYTE, len);
1508         }
1509         if (el->prev) {
1510                 el->prev->next = el;
1511         } else {
1512                 tdb->transaction->elements = el;
1513         }
1514         tdb->transaction->elements_last = el;
1515         return 0;
1516
1517 fail:
1518         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1519         tdb->ecode = TDB_ERR_IO;
1520         tdb->transaction->transaction_error = 1;
1521         return -1;
1522 }
1523
1524 /*
1525   accelerated hash chain head search, using the cached hash heads
1526 */
1527 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1528 {
1529         u32 h = *chain;
1530         for (;h < tdb->header.hash_size;h++) {
1531                 /* the +1 takes account of the freelist */
1532                 if (0 != tdb->transaction->hash_heads[h+1]) {
1533                         break;
1534                 }
1535         }
1536         (*chain) = h;
1537 }
1538
1539 /*
1540   out of bounds check during a transaction
1541 */
1542 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1543 {
1544         if (len <= tdb->map_size) {
1545                 return 0;
1546         }
1547         return TDB_ERRCODE(TDB_ERR_IO, -1);
1548 }
1549
1550 /*
1551   transaction version of tdb_expand().
1552 */
1553 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1554                                    tdb_off_t addition)
1555 {
1556         /* add a write to the transaction elements, so subsequent
1557            reads see the zero data */
1558         if (transaction_write(tdb, size, NULL, addition) != 0) {
1559                 return -1;
1560         }
1561
1562         return 0;
1563 }
1564
1565 /*
1566   brlock during a transaction - ignore them
1567 */
1568 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1569                               int rw_type, int lck_type, int probe, size_t len)
1570 {
1571         return 0;
1572 }
1573
1574 static const struct tdb_methods transaction_methods = {
1575         transaction_read,
1576         transaction_write,
1577         transaction_next_hash_chain,
1578         transaction_oob,
1579         transaction_expand_file,
1580         transaction_brlock
1581 };
1582
1583
1584 /*
1585   start a tdb transaction. No token is returned, as only a single
1586   transaction is allowed to be pending per tdb_context
1587 */
1588 int tdb_transaction_start(struct tdb_context *tdb)
1589 {
1590         /* some sanity checks */
1591         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1592                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1593                 tdb->ecode = TDB_ERR_EINVAL;
1594                 return -1;
1595         }
1596
1597         /* cope with nested tdb_transaction_start() calls */
1598         if (tdb->transaction != NULL) {
1599                 tdb->transaction->nesting++;
1600                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1601                          tdb->transaction->nesting));
1602                 return 0;
1603         }
1604
1605         if (tdb->num_locks != 0 || tdb->global_lock.count) {
1606                 /* the caller must not have any locks when starting a
1607                    transaction as otherwise we'll be screwed by lack
1608                    of nested locks in posix */
1609                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1610                 tdb->ecode = TDB_ERR_LOCK;
1611                 return -1;
1612         }
1613
1614         if (tdb->travlocks.next != NULL) {
1615                 /* you cannot use transactions inside a traverse (although you can use
1616                    traverse inside a transaction) as otherwise you can end up with
1617                    deadlock */
1618                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1619                 tdb->ecode = TDB_ERR_LOCK;
1620                 return -1;
1621         }
1622
1623         tdb->transaction = (struct tdb_transaction *)
1624                 calloc(sizeof(struct tdb_transaction), 1);
1625         if (tdb->transaction == NULL) {
1626                 tdb->ecode = TDB_ERR_OOM;
1627                 return -1;
1628         }
1629
1630         /* get the transaction write lock. This is a blocking lock. As
1631            discussed with Volker, there are a number of ways we could
1632            make this async, which we will probably do in the future */
1633         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1634                 SAFE_FREE(tdb->transaction);
1635                 return -1;
1636         }
1637
1638         /* get a read lock from the freelist to the end of file. This
1639            is upgraded to a write lock during the commit */
1640         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1641                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1642                 tdb->ecode = TDB_ERR_LOCK;
1643                 goto fail;
1644         }
1645
1646         /* setup a copy of the hash table heads so the hash scan in
1647            traverse can be fast */
1648         tdb->transaction->hash_heads = (u32 *)
1649                 calloc(tdb->header.hash_size+1, sizeof(u32));
1650         if (tdb->transaction->hash_heads == NULL) {
1651                 tdb->ecode = TDB_ERR_OOM;
1652                 goto fail;
1653         }
1654         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1655                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1656                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1657                 tdb->ecode = TDB_ERR_IO;
1658                 goto fail;
1659         }
1660
1661         /* make sure we know about any file expansions already done by
1662            anyone else */
1663         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1664         tdb->transaction->old_map_size = tdb->map_size;
1665
1666         /* finally hook the io methods, replacing them with
1667            transaction specific methods */
1668         tdb->transaction->io_methods = tdb->methods;
1669         tdb->methods = &transaction_methods;
1670
1671         /* by calling this transaction write here, we ensure that we don't grow the
1672            transaction linked list due to hash table updates */
1673         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1674                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
1675                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1676                 tdb->ecode = TDB_ERR_IO;
1677                 tdb->methods = tdb->transaction->io_methods;
1678                 goto fail;
1679         }
1680
1681         return 0;
1682
1683 fail:
1684         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1685         tdb_transaction_unlock(tdb);
1686         SAFE_FREE(tdb->transaction->hash_heads);
1687         SAFE_FREE(tdb->transaction);
1688         return -1;
1689 }
1690
1691
1692 /*
1693   cancel the current transaction
1694 */
1695 int tdb_transaction_cancel(struct tdb_context *tdb)
1696 {
1697         if (tdb->transaction == NULL) {
1698                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1699                 return -1;
1700         }
1701
1702         if (tdb->transaction->nesting != 0) {
1703                 tdb->transaction->transaction_error = 1;
1704                 tdb->transaction->nesting--;
1705                 return 0;
1706         }
1707
1708         tdb->map_size = tdb->transaction->old_map_size;
1709
1710         /* free all the transaction elements */
1711         while (tdb->transaction->elements) {
1712                 struct tdb_transaction_el *el = tdb->transaction->elements;
1713                 tdb->transaction->elements = el->next;
1714                 free(el->data);
1715                 free(el);
1716         }
1717
1718         /* remove any global lock created during the transaction */
1719         if (tdb->global_lock.count != 0) {
1720                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1721                 tdb->global_lock.count = 0;
1722         }
1723
1724         /* remove any locks created during the transaction */
1725         if (tdb->num_locks != 0) {
1726                 int i;
1727                 for (i=0;i<tdb->num_lockrecs;i++) {
1728                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1729                                    F_UNLCK,F_SETLKW, 0, 1);
1730                 }
1731                 tdb->num_locks = 0;
1732                 tdb->num_lockrecs = 0;
1733                 SAFE_FREE(tdb->lockrecs);
1734         }
1735
1736         /* restore the normal io methods */
1737         tdb->methods = tdb->transaction->io_methods;
1738
1739         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1740         tdb_transaction_unlock(tdb);
1741         SAFE_FREE(tdb->transaction->hash_heads);
1742         SAFE_FREE(tdb->transaction);
1743
1744         return 0;
1745 }
1746
1747 /*
1748   sync to disk
1749 */
1750 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1751 {
1752         if (fsync(tdb->fd) != 0) {
1753                 tdb->ecode = TDB_ERR_IO;
1754                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1755                 return -1;
1756         }
1757 #if defined(HAVE_MSYNC) && defined(MS_SYNC)
1758         if (tdb->map_ptr) {
1759                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1760                 if (msync(moffset + (char *)tdb->map_ptr,
1761                           length + (offset - moffset), MS_SYNC) != 0) {
1762                         tdb->ecode = TDB_ERR_IO;
1763                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1764                                  strerror(errno)));
1765                         return -1;
1766                 }
1767         }
1768 #endif
1769         return 0;
1770 }
1771
1772
1773 /*
1774   work out how much space the linearised recovery data will consume
1775 */
1776 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1777 {
1778         struct tdb_transaction_el *el;
1779         tdb_len_t recovery_size = 0;
1780
1781         recovery_size = sizeof(u32);
1782         for (el=tdb->transaction->elements;el;el=el->next) {
1783                 if (el->offset >= tdb->transaction->old_map_size) {
1784                         continue;
1785                 }
1786                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1787         }
1788
1789         return recovery_size;
1790 }
1791
1792 /*
1793   allocate the recovery area, or use an existing recovery area if it is
1794   large enough
1795 */
1796 static int tdb_recovery_allocate(struct tdb_context *tdb,
1797                                  tdb_len_t *recovery_size,
1798                                  tdb_off_t *recovery_offset,
1799                                  tdb_len_t *recovery_max_size)
1800 {
1801         struct list_struct rec;
1802         const struct tdb_methods *methods = tdb->transaction->io_methods;
1803         tdb_off_t recovery_head;
1804
1805         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1806                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1807                 return -1;
1808         }
1809
1810         rec.rec_len = 0;
1811
1812         if (recovery_head != 0 &&
1813             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1814                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1815                 return -1;
1816         }
1817
1818         *recovery_size = tdb_recovery_size(tdb);
1819
1820         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1821                 /* it fits in the existing area */
1822                 *recovery_max_size = rec.rec_len;
1823                 *recovery_offset = recovery_head;
1824                 return 0;
1825         }
1826
1827         /* we need to free up the old recovery area, then allocate a
1828            new one at the end of the file. Note that we cannot use
1829            tdb_allocate() to allocate the new one as that might return
1830            us an area that is being currently used (as of the start of
1831            the transaction) */
1832         if (recovery_head != 0) {
1833                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1834                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1835                         return -1;
1836                 }
1837         }
1838
1839         /* the tdb_free() call might have increased the recovery size */
1840         *recovery_size = tdb_recovery_size(tdb);
1841
1842         /* round up to a multiple of page size */
1843         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1844         *recovery_offset = tdb->map_size;
1845         recovery_head = *recovery_offset;
1846
1847         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1848                                      (tdb->map_size - tdb->transaction->old_map_size) +
1849                                      sizeof(rec) + *recovery_max_size) == -1) {
1850                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1851                 return -1;
1852         }
1853
1854         /* remap the file (if using mmap) */
1855         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1856
1857         /* we have to reset the old map size so that we don't try to expand the file
1858            again in the transaction commit, which would destroy the recovery area */
1859         tdb->transaction->old_map_size = tdb->map_size;
1860
1861         /* write the recovery header offset and sync - we can sync without a race here
1862            as the magic ptr in the recovery record has not been set */
1863         CONVERT(recovery_head);
1864         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1865                                &recovery_head, sizeof(tdb_off_t)) == -1) {
1866                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1867                 return -1;
1868         }
1869
1870         return 0;
1871 }
1872
1873
1874 /*
1875   setup the recovery data that will be used on a crash during commit
1876 */
1877 static int transaction_setup_recovery(struct tdb_context *tdb,
1878                                       tdb_off_t *magic_offset)
1879 {
1880         struct tdb_transaction_el *el;
1881         tdb_len_t recovery_size;
1882         unsigned char *data, *p;
1883         const struct tdb_methods *methods = tdb->transaction->io_methods;
1884         struct list_struct *rec;
1885         tdb_off_t recovery_offset, recovery_max_size;
1886         tdb_off_t old_map_size = tdb->transaction->old_map_size;
1887         u32 magic, tailer;
1888
1889         /*
1890           check that the recovery area has enough space
1891         */
1892         if (tdb_recovery_allocate(tdb, &recovery_size,
1893                                   &recovery_offset, &recovery_max_size) == -1) {
1894                 return -1;
1895         }
1896
1897         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1898         if (data == NULL) {
1899                 tdb->ecode = TDB_ERR_OOM;
1900                 return -1;
1901         }
1902
1903         rec = (struct list_struct *)data;
1904         memset(rec, 0, sizeof(*rec));
1905
1906         rec->magic    = 0;
1907         rec->data_len = recovery_size;
1908         rec->rec_len  = recovery_max_size;
1909         rec->key_len  = old_map_size;
1910         CONVERT(rec);
1911
1912         /* build the recovery data into a single blob to allow us to do a single
1913            large write, which should be more efficient */
1914         p = data + sizeof(*rec);
1915         for (el=tdb->transaction->elements;el;el=el->next) {
1916                 if (el->offset >= old_map_size) {
1917                         continue;
1918                 }
1919                 if (el->offset + el->length > tdb->transaction->old_map_size) {
1920                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1921                         free(data);
1922                         tdb->ecode = TDB_ERR_CORRUPT;
1923                         return -1;
1924                 }
1925                 memcpy(p, &el->offset, 4);
1926                 memcpy(p+4, &el->length, 4);
1927                 if (DOCONV()) {
1928                         tdb_convert(p, 8);
1929                 }
1930                 /* the recovery area contains the old data, not the
1931                    new data, so we have to call the original tdb_read
1932                    method to get it */
1933                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1934                         free(data);
1935                         tdb->ecode = TDB_ERR_IO;
1936                         return -1;
1937                 }
1938                 p += 8 + el->length;
1939         }
1940
1941         /* and the tailer */
1942         tailer = sizeof(*rec) + recovery_max_size;
1943         memcpy(p, &tailer, 4);
1944         CONVERT(p);
1945
1946         /* write the recovery data to the recovery area */
1947         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1948                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1949                 free(data);
1950                 tdb->ecode = TDB_ERR_IO;
1951                 return -1;
1952         }
1953
1954         /* as we don't have ordered writes, we have to sync the recovery
1955            data before we update the magic to indicate that the recovery
1956            data is present */
1957         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1958                 free(data);
1959                 return -1;
1960         }
1961
1962         free(data);
1963
1964         magic = TDB_RECOVERY_MAGIC;
1965         CONVERT(magic);
1966
1967         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1968
1969         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1970                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1971                 tdb->ecode = TDB_ERR_IO;
1972                 return -1;
1973         }
1974
1975         /* ensure the recovery magic marker is on disk */
1976         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1977                 return -1;
1978         }
1979
1980         return 0;
1981 }
1982
1983 /*
1984   commit the current transaction
1985 */
1986 int tdb_transaction_commit(struct tdb_context *tdb)
1987 {
1988         const struct tdb_methods *methods;
1989         tdb_off_t magic_offset = 0;
1990         u32 zero = 0;
1991
1992         if (tdb->transaction == NULL) {
1993                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1994                 return -1;
1995         }
1996
1997         if (tdb->transaction->transaction_error) {
1998                 tdb->ecode = TDB_ERR_IO;
1999                 tdb_transaction_cancel(tdb);
2000                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
2001                 return -1;
2002         }
2003
2004         if (tdb->transaction->nesting != 0) {
2005                 tdb->transaction->nesting--;
2006                 return 0;
2007         }
2008
2009         /* check for a null transaction */
2010         if (tdb->transaction->elements == NULL) {
2011                 tdb_transaction_cancel(tdb);
2012                 return 0;
2013         }
2014
2015         methods = tdb->transaction->io_methods;
2016
2017         /* if there are any locks pending then the caller has not
2018            nested their locks properly, so fail the transaction */
2019         if (tdb->num_locks || tdb->global_lock.count) {
2020                 tdb->ecode = TDB_ERR_LOCK;
2021                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2022                 tdb_transaction_cancel(tdb);
2023                 return -1;
2024         }
2025
2026         /* upgrade the main transaction lock region to a write lock */
2027         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2028                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2029                 tdb->ecode = TDB_ERR_LOCK;
2030                 tdb_transaction_cancel(tdb);
2031                 return -1;
2032         }
2033
2034         /* get the global lock - this prevents new users attaching to the database
2035            during the commit */
2036         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2037                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2038                 tdb->ecode = TDB_ERR_LOCK;
2039                 tdb_transaction_cancel(tdb);
2040                 return -1;
2041         }
2042
2043         if (!(tdb->flags & TDB_NOSYNC)) {
2044                 /* write the recovery data to the end of the file */
2045                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2046                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2047                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2048                         tdb_transaction_cancel(tdb);
2049                         return -1;
2050                 }
2051         }
2052
2053         /* expand the file to the new size if needed */
2054         if (tdb->map_size != tdb->transaction->old_map_size) {
2055                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2056                                              tdb->map_size -
2057                                              tdb->transaction->old_map_size) == -1) {
2058                         tdb->ecode = TDB_ERR_IO;
2059                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2060                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2061                         tdb_transaction_cancel(tdb);
2062                         return -1;
2063                 }
2064                 tdb->map_size = tdb->transaction->old_map_size;
2065                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2066         }
2067
2068         /* perform all the writes */
2069         while (tdb->transaction->elements) {
2070                 struct tdb_transaction_el *el = tdb->transaction->elements;
2071
2072                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2073                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2074
2075                         /* we've overwritten part of the data and
2076                            possibly expanded the file, so we need to
2077                            run the crash recovery code */
2078                         tdb->methods = methods;
2079                         tdb_transaction_recover(tdb);
2080
2081                         tdb_transaction_cancel(tdb);
2082                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2083
2084                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2085                         return -1;
2086                 }
2087                 tdb->transaction->elements = el->next;
2088                 free(el->data);
2089                 free(el);
2090         }
2091
2092         if (!(tdb->flags & TDB_NOSYNC)) {
2093                 /* ensure the new data is on disk */
2094                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2095                         return -1;
2096                 }
2097
2098                 /* remove the recovery marker */
2099                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2100                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2101                         return -1;
2102                 }
2103
2104                 /* ensure the recovery marker has been removed on disk */
2105                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
2106                         return -1;
2107                 }
2108         }
2109
2110         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2111
2112         /*
2113           TODO: maybe write to some dummy hdr field, or write to magic
2114           offset without mmap, before the last sync, instead of the
2115           utime() call
2116         */
2117
2118         /* on some systems (like Linux 2.6.x) changes via mmap/msync
2119            don't change the mtime of the file, this means the file may
2120            not be backed up (as tdb rounding to block sizes means that
2121            file size changes are quite rare too). The following forces
2122            mtime changes when a transaction completes */
2123 #ifdef HAVE_UTIME
2124         utime(tdb->name, NULL);
2125 #endif
2126
2127         /* use a transaction cancel to free memory and remove the
2128            transaction locks */
2129         tdb_transaction_cancel(tdb);
2130         return 0;
2131 }
2132
2133
2134 /*
2135   recover from an aborted transaction. Must be called with exclusive
2136   database write access already established (including the global
2137   lock to prevent new processes attaching)
2138 */
2139 int tdb_transaction_recover(struct tdb_context *tdb)
2140 {
2141         tdb_off_t recovery_head, recovery_eof;
2142         unsigned char *data, *p;
2143         u32 zero = 0;
2144         struct list_struct rec;
2145
2146         /* find the recovery area */
2147         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2148                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2149                 tdb->ecode = TDB_ERR_IO;
2150                 return -1;
2151         }
2152
2153         if (recovery_head == 0) {
2154                 /* we have never allocated a recovery record */
2155                 return 0;
2156         }
2157
2158         /* read the recovery record */
2159         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2160                                    sizeof(rec), DOCONV()) == -1) {
2161                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2162                 tdb->ecode = TDB_ERR_IO;
2163                 return -1;
2164         }
2165
2166         if (rec.magic != TDB_RECOVERY_MAGIC) {
2167                 /* there is no valid recovery data */
2168                 return 0;
2169         }
2170
2171         if (tdb->read_only) {
2172                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2173                 tdb->ecode = TDB_ERR_CORRUPT;
2174                 return -1;
2175         }
2176
2177         recovery_eof = rec.key_len;
2178
2179         data = (unsigned char *)malloc(rec.data_len);
2180         if (data == NULL) {
2181                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2182                 tdb->ecode = TDB_ERR_OOM;
2183                 return -1;
2184         }
2185
2186         /* read the full recovery data */
2187         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2188                                    rec.data_len, 0) == -1) {
2189                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2190                 tdb->ecode = TDB_ERR_IO;
2191                 return -1;
2192         }
2193
2194         /* recover the file data */
2195         p = data;
2196         while (p+8 < data + rec.data_len) {
2197                 u32 ofs, len;
2198                 if (DOCONV()) {
2199                         tdb_convert(p, 8);
2200                 }
2201                 memcpy(&ofs, p, 4);
2202                 memcpy(&len, p+4, 4);
2203
2204                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2205                         free(data);
2206                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2207                         tdb->ecode = TDB_ERR_IO;
2208                         return -1;
2209                 }
2210                 p += 8 + len;
2211         }
2212
2213         free(data);
2214
2215         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2216                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2217                 tdb->ecode = TDB_ERR_IO;
2218                 return -1;
2219         }
2220
2221         /* if the recovery area is after the recovered eof then remove it */
2222         if (recovery_eof <= recovery_head) {
2223                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2224                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2225                         tdb->ecode = TDB_ERR_IO;
2226                         return -1;
2227                 }
2228         }
2229
2230         /* remove the recovery magic */
2231         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2232                           &zero) == -1) {
2233                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2234                 tdb->ecode = TDB_ERR_IO;
2235                 return -1;
2236         }
2237
2238         /* reduce the file size to the old size */
2239         tdb_munmap(tdb);
2240         if (ftruncate(tdb->fd, recovery_eof) != 0) {
2241                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2242                 tdb->ecode = TDB_ERR_IO;
2243                 return -1;
2244         }
2245         tdb->map_size = recovery_eof;
2246         tdb_mmap(tdb);
2247
2248         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2249                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2250                 tdb->ecode = TDB_ERR_IO;
2251                 return -1;
2252         }
2253
2254         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2255                  recovery_eof));
2256
2257         /* all done */
2258         return 0;
2259 }
2260
2261 /* file: freelist.c */
2262
2263 /* read a freelist record and check for simple errors */
2264 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2265 {
2266         if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2267                 return -1;
2268
2269         if (rec->magic == TDB_MAGIC) {
2270                 /* this happens when a app is showdown while deleting a record - we should
2271                    not completely fail when this happens */
2272                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2273                          rec->magic, off));
2274                 rec->magic = TDB_FREE_MAGIC;
2275                 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2276                         return -1;
2277         }
2278
2279         if (rec->magic != TDB_FREE_MAGIC) {
2280                 /* Ensure ecode is set for log fn. */
2281                 tdb->ecode = TDB_ERR_CORRUPT;
2282                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2283                            rec->magic, off));
2284                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2285         }
2286         if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2287                 return -1;
2288         return 0;
2289 }
2290
2291
2292
2293 /* Remove an element from the freelist.  Must have alloc lock. */
2294 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2295 {
2296         tdb_off_t last_ptr, i;
2297
2298         /* read in the freelist top */
2299         last_ptr = FREELIST_TOP;
2300         while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2301                 if (i == off) {
2302                         /* We've found it! */
2303                         return tdb_ofs_write(tdb, last_ptr, &next);
2304                 }
2305                 /* Follow chain (next offset is at start of record) */
2306                 last_ptr = i;
2307         }
2308         TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2309         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2310 }
2311
2312
2313 /* update a record tailer (must hold allocation lock) */
2314 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2315                          const struct list_struct *rec)
2316 {
2317         tdb_off_t totalsize;
2318
2319         /* Offset of tailer from record header */
2320         totalsize = sizeof(*rec) + rec->rec_len;
2321         return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2322                          &totalsize);
2323 }
2324
2325 /* Add an element into the freelist. Merge adjacent records if
2326    neccessary. */
2327 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2328 {
2329         tdb_off_t right, left;
2330
2331         /* Allocation and tailer lock */
2332         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2333                 return -1;
2334
2335         /* set an initial tailer, so if we fail we don't leave a bogus record */
2336         if (update_tailer(tdb, offset, rec) != 0) {
2337                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2338                 goto fail;
2339         }
2340
2341         /* Look right first (I'm an Australian, dammit) */
2342         right = offset + sizeof(*rec) + rec->rec_len;
2343         if (right + sizeof(*rec) <= tdb->map_size) {
2344                 struct list_struct r;
2345
2346                 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2347                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2348                         goto left;
2349                 }
2350
2351                 /* If it's free, expand to include it. */
2352                 if (r.magic == TDB_FREE_MAGIC) {
2353                         if (remove_from_freelist(tdb, right, r.next) == -1) {
2354                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2355                                 goto left;
2356                         }
2357                         rec->rec_len += sizeof(r) + r.rec_len;
2358                 }
2359         }
2360
2361 left:
2362         /* Look left */
2363         left = offset - sizeof(tdb_off_t);
2364         if (left > TDB_DATA_START(tdb->header.hash_size)) {
2365                 struct list_struct l;
2366                 tdb_off_t leftsize;
2367
2368                 /* Read in tailer and jump back to header */
2369                 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2370                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2371                         goto update;
2372                 }
2373
2374                 /* it could be uninitialised data */
2375                 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2376                         goto update;
2377                 }
2378
2379                 left = offset - leftsize;
2380
2381                 /* Now read in record */
2382                 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2383                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2384                         goto update;
2385                 }
2386
2387                 /* If it's free, expand to include it. */
2388                 if (l.magic == TDB_FREE_MAGIC) {
2389                         if (remove_from_freelist(tdb, left, l.next) == -1) {
2390                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2391                                 goto update;
2392                         } else {
2393                                 offset = left;
2394                                 rec->rec_len += leftsize;
2395                         }
2396                 }
2397         }
2398
2399 update:
2400         if (update_tailer(tdb, offset, rec) == -1) {
2401                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2402                 goto fail;
2403         }
2404
2405         /* Now, prepend to free list */
2406         rec->magic = TDB_FREE_MAGIC;
2407
2408         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2409             tdb_rec_write(tdb, offset, rec) == -1 ||
2410             tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2411                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2412                 goto fail;
2413         }
2414
2415         /* And we're done. */
2416         tdb_unlock(tdb, -1, F_WRLCK);
2417         return 0;
2418
2419  fail:
2420         tdb_unlock(tdb, -1, F_WRLCK);
2421         return -1;
2422 }
2423
2424
2425 /*
2426    the core of tdb_allocate - called when we have decided which
2427    free list entry to use
2428  */
2429 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2430                                 struct list_struct *rec, tdb_off_t last_ptr)
2431 {
2432         struct list_struct newrec;
2433         tdb_off_t newrec_ptr;
2434
2435         memset(&newrec, '\0', sizeof(newrec));
2436
2437         /* found it - now possibly split it up  */
2438         if (rec->rec_len > length + MIN_REC_SIZE) {
2439                 /* Length of left piece */
2440                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2441
2442                 /* Right piece to go on free list */
2443                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2444                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2445
2446                 /* And left record is shortened */
2447                 rec->rec_len = length;
2448         } else {
2449                 newrec_ptr = 0;
2450         }
2451
2452         /* Remove allocated record from the free list */
2453         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2454                 return 0;
2455         }
2456
2457         /* Update header: do this before we drop alloc
2458            lock, otherwise tdb_free() might try to
2459            merge with us, thinking we're free.
2460            (Thanks Jeremy Allison). */
2461         rec->magic = TDB_MAGIC;
2462         if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2463                 return 0;
2464         }
2465
2466         /* Did we create new block? */
2467         if (newrec_ptr) {
2468                 /* Update allocated record tailer (we
2469                    shortened it). */
2470                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2471                         return 0;
2472                 }
2473
2474                 /* Free new record */
2475                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2476                         return 0;
2477                 }
2478         }
2479
2480         /* all done - return the new record offset */
2481         return rec_ptr;
2482 }
2483
2484 /* allocate some space from the free list. The offset returned points
2485    to a unconnected list_struct within the database with room for at
2486    least length bytes of total data
2487
2488    0 is returned if the space could not be allocated
2489  */
2490 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2491 {
2492         tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2493         struct {
2494                 tdb_off_t rec_ptr, last_ptr;
2495                 tdb_len_t rec_len;
2496         } bestfit;
2497
2498         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2499                 return 0;
2500
2501         /* Extra bytes required for tailer */
2502         length += sizeof(tdb_off_t);
2503
2504  again:
2505         last_ptr = FREELIST_TOP;
2506
2507         /* read in the freelist top */
2508         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2509                 goto fail;
2510
2511         bestfit.rec_ptr = 0;
2512         bestfit.last_ptr = 0;
2513         bestfit.rec_len = 0;
2514
2515         /*
2516            this is a best fit allocation strategy. Originally we used
2517            a first fit strategy, but it suffered from massive fragmentation
2518            issues when faced with a slowly increasing record size.
2519          */
2520         while (rec_ptr) {
2521                 if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2522                         goto fail;
2523                 }
2524
2525                 if (rec->rec_len >= length) {
2526                         if (bestfit.rec_ptr == 0 ||
2527                             rec->rec_len < bestfit.rec_len) {
2528                                 bestfit.rec_len = rec->rec_len;
2529                                 bestfit.rec_ptr = rec_ptr;
2530                                 bestfit.last_ptr = last_ptr;
2531                                 /* consider a fit to be good enough if
2532                                    we aren't wasting more than half
2533                                    the space */
2534                                 if (bestfit.rec_len < 2*length) {
2535                                         break;
2536                                 }
2537                         }
2538                 }
2539
2540                 /* move to the next record */
2541                 last_ptr = rec_ptr;
2542                 rec_ptr = rec->next;
2543         }
2544
2545         if (bestfit.rec_ptr != 0) {
2546                 if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2547                         goto fail;
2548                 }
2549
2550                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2551                 tdb_unlock(tdb, -1, F_WRLCK);
2552                 return newrec_ptr;
2553         }
2554
2555         /* we didn't find enough space. See if we can expand the
2556            database and if we can then try again */
2557         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2558                 goto again;
2559  fail:
2560         tdb_unlock(tdb, -1, F_WRLCK);
2561         return 0;
2562 }
2563
2564 /* file: freelistcheck.c */
2565
2566 /* Check the freelist is good and contains no loops.
2567    Very memory intensive - only do this as a consistency
2568    checker. Heh heh - uses an in memory tdb as the storage
2569    for the "seen" record list. For some reason this strikes
2570    me as extremely clever as I don't have to write another tree
2571    data structure implementation :-).
2572  */
2573
2574 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2575 {
2576         TDB_DATA key, data;
2577
2578         memset(&data, '\0', sizeof(data));
2579         key.dptr = (unsigned char *)&rec_ptr;
2580         key.dsize = sizeof(rec_ptr);
2581         return tdb_store(mem_tdb, key, data, TDB_INSERT);
2582 }
2583
2584 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2585 {
2586         struct tdb_context *mem_tdb = NULL;
2587         struct list_struct rec;
2588         tdb_off_t rec_ptr, last_ptr;
2589         int ret = -1;
2590
2591         *pnum_entries = 0;
2592
2593         mem_tdb = tdb_open("flval", tdb->header.hash_size,
2594                                 TDB_INTERNAL, O_RDWR, 0600);
2595         if (!mem_tdb) {
2596                 return -1;
2597         }
2598
2599         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2600                 tdb_close(mem_tdb);
2601                 return 0;
2602         }
2603
2604         last_ptr = FREELIST_TOP;
2605
2606         /* Store the FREELIST_TOP record. */
2607         if (seen_insert(mem_tdb, last_ptr) == -1) {
2608                 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2609                 goto fail;
2610         }
2611
2612         /* read in the freelist top */
2613         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2614                 goto fail;
2615         }
2616
2617         while (rec_ptr) {
2618
2619                 /* If we can't store this record (we've seen it
2620                    before) then the free list has a loop and must
2621                    be corrupt. */
2622
2623                 if (seen_insert(mem_tdb, rec_ptr)) {
2624                         ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2625                         goto fail;
2626                 }
2627
2628                 if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2629                         goto fail;
2630                 }
2631
2632                 /* move to the next record */
2633                 last_ptr = rec_ptr;
2634                 rec_ptr = rec.next;
2635                 *pnum_entries += 1;
2636         }
2637
2638         ret = 0;
2639
2640   fail:
2641
2642         tdb_close(mem_tdb);
2643         tdb_unlock(tdb, -1, F_WRLCK);
2644         return ret;
2645 }
2646
2647 /* file: traverse.c */
2648
2649 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2650 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2651                          struct list_struct *rec)
2652 {
2653         int want_next = (tlock->off != 0);
2654
2655         /* Lock each chain from the start one. */
2656         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2657                 if (!tlock->off && tlock->hash != 0) {
2658                         /* this is an optimisation for the common case where
2659                            the hash chain is empty, which is particularly
2660                            common for the use of tdb with ldb, where large
2661                            hashes are used. In that case we spend most of our
2662                            time in tdb_brlock(), locking empty hash chains.
2663
2664                            To avoid this, we do an unlocked pre-check to see
2665                            if the hash chain is empty before starting to look
2666                            inside it. If it is empty then we can avoid that
2667                            hash chain. If it isn't empty then we can't believe
2668                            the value we get back, as we read it without a
2669                            lock, so instead we get the lock and re-fetch the
2670                            value below.
2671
2672                            Notice that not doing this optimisation on the
2673                            first hash chain is critical. We must guarantee
2674                            that we have done at least one fcntl lock at the
2675                            start of a search to guarantee that memory is
2676                            coherent on SMP systems. If records are added by
2677                            others during the search then thats OK, and we
2678                            could possibly miss those with this trick, but we
2679                            could miss them anyway without this trick, so the
2680                            semantics don't change.
2681
2682                            With a non-indexed ldb search this trick gains us a
2683                            factor of around 80 in speed on a linux 2.6.x
2684                            system (testing using ldbtest).
2685                         */
2686                         tdb->methods->next_hash_chain(tdb, &tlock->hash);
2687                         if (tlock->hash == tdb->header.hash_size) {
2688                                 continue;
2689                         }
2690                 }
2691
2692                 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2693                         return -1;
2694
2695                 /* No previous record?  Start at top of chain. */
2696                 if (!tlock->off) {
2697                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2698                                      &tlock->off) == -1)
2699                                 goto fail;
2700                 } else {
2701                         /* Otherwise unlock the previous record. */
2702                         if (tdb_unlock_record(tdb, tlock->off) != 0)
2703                                 goto fail;
2704                 }
2705
2706                 if (want_next) {
2707                         /* We have offset of old record: grab next */
2708                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2709                                 goto fail;
2710                         tlock->off = rec->next;
2711                 }
2712
2713                 /* Iterate through chain */
2714                 while( tlock->off) {
2715                         tdb_off_t current;
2716                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2717                                 goto fail;
2718
2719                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2720                         if (tlock->off == rec->next) {
2721                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2722                                 goto fail;
2723                         }
2724
2725                         if (!TDB_DEAD(rec)) {
2726                                 /* Woohoo: we found one! */
2727                                 if (tdb_lock_record(tdb, tlock->off) != 0)
2728                                         goto fail;
2729                                 return tlock->off;
2730                         }
2731
2732                         /* Try to clean dead ones from old traverses */
2733                         current = tlock->off;
2734                         tlock->off = rec->next;
2735                         if (!(tdb->read_only || tdb->traverse_read) &&
2736                             tdb_do_delete(tdb, current, rec) != 0)
2737                                 goto fail;
2738                 }
2739                 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2740                 want_next = 0;
2741         }
2742         /* We finished iteration without finding anything */
2743         return TDB_ERRCODE(TDB_SUCCESS, 0);
2744
2745  fail:
2746         tlock->off = 0;
2747         if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2748                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2749         return -1;
2750 }
2751
2752 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2753    return -1 on error or the record count traversed
2754    if fn is NULL then it is not called
2755    a non-zero return value from fn() indicates that the traversal should stop
2756   */
2757 static int tdb_traverse_internal(struct tdb_context *tdb,
2758                                  tdb_traverse_func fn, void *private_data,
2759                                  struct tdb_traverse_lock *tl)
2760 {
2761         TDB_DATA key, dbuf;
2762         struct list_struct rec;
2763         int ret, count = 0;
2764
2765         /* This was in the initializaton, above, but the IRIX compiler
2766          * did not like it.  crh
2767          */
2768         tl->next = tdb->travlocks.next;
2769
2770         /* fcntl locks don't stack: beware traverse inside traverse */
2771         tdb->travlocks.next = tl;
2772
2773         /* tdb_next_lock places locks on the record returned, and its chain */
2774         while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2775                 count++;
2776                 /* now read the full record */
2777                 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2778                                           rec.key_len + rec.data_len);
2779                 if (!key.dptr) {
2780                         ret = -1;
2781                         if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2782                                 goto out;
2783                         if (tdb_unlock_record(tdb, tl->off) != 0)
2784                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2785                         goto out;
2786                 }
2787                 key.dsize = rec.key_len;
2788                 dbuf.dptr = key.dptr + rec.key_len;
2789                 dbuf.dsize = rec.data_len;
2790
2791                 /* Drop chain lock, call out */
2792                 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2793                         ret = -1;
2794                         SAFE_FREE(key.dptr);
2795                         goto out;
2796                 }
2797                 if (fn && fn(tdb, key, dbuf, private_data)) {
2798                         /* They want us to terminate traversal */
2799                         ret = count;
2800                         if (tdb_unlock_record(tdb, tl->off) != 0) {
2801                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2802                                 ret = -1;
2803                         }
2804                         SAFE_FREE(key.dptr);
2805                         goto out;
2806                 }
2807                 SAFE_FREE(key.dptr);
2808         }
2809 out:
2810         tdb->travlocks.next = tl->next;
2811         if (ret < 0)
2812                 return -1;
2813         else
2814                 return count;
2815 }
2816
2817
2818 /*
2819   a write style traverse - temporarily marks the db read only
2820 */
2821 int tdb_traverse_read(struct tdb_context *tdb,
2822                       tdb_traverse_func fn, void *private_data)
2823 {
2824         struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2825         int ret;
2826
2827         /* we need to get a read lock on the transaction lock here to
2828            cope with the lock ordering semantics of solaris10 */
2829         if (tdb_transaction_lock(tdb, F_RDLCK)) {
2830                 return -1;
2831         }
2832
2833         tdb->traverse_read++;
2834         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2835         tdb->traverse_read--;
2836
2837         tdb_transaction_unlock(tdb);
2838
2839         return ret;
2840 }
2841
2842 /*
2843   a write style traverse - needs to get the transaction lock to
2844   prevent deadlocks
2845 */
2846 int tdb_traverse(struct tdb_context *tdb,
2847                  tdb_traverse_func fn, void *private_data)
2848 {
2849         struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2850         int ret;
2851
2852         if (tdb->read_only || tdb->traverse_read) {
2853                 return tdb_traverse_read(tdb, fn, private_data);
2854         }
2855
2856         if (tdb_transaction_lock(tdb, F_WRLCK)) {
2857                 return -1;
2858         }
2859
2860         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2861
2862         tdb_transaction_unlock(tdb);
2863
2864         return ret;
2865 }
2866
2867
2868 /* find the first entry in the database and return its key */
2869 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2870 {
2871         TDB_DATA key;
2872         struct list_struct rec;
2873
2874         /* release any old lock */
2875         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2876                 return tdb_null;
2877         tdb->travlocks.off = tdb->travlocks.hash = 0;
2878         tdb->travlocks.lock_rw = F_RDLCK;
2879
2880         /* Grab first record: locks chain and returned record. */
2881         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2882                 return tdb_null;
2883         /* now read the key */
2884         key.dsize = rec.key_len;
2885         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2886
2887         /* Unlock the hash chain of the record we just read. */
2888         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2889                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2890         return key;
2891 }
2892
2893 /* find the next entry in the database, returning its key */
2894 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2895 {
2896         u32 oldhash;
2897         TDB_DATA key = tdb_null;
2898         struct list_struct rec;
2899         unsigned char *k = NULL;
2900
2901         /* Is locked key the old key?  If so, traverse will be reliable. */
2902         if (tdb->travlocks.off) {
2903                 if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2904                         return tdb_null;
2905                 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2906                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2907                                             rec.key_len))
2908                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2909                         /* No, it wasn't: unlock it and start from scratch */
2910                         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2911                                 SAFE_FREE(k);
2912                                 return tdb_null;
2913                         }
2914                         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2915                                 SAFE_FREE(k);
2916                                 return tdb_null;
2917                         }
2918                         tdb->travlocks.off = 0;
2919                 }
2920
2921                 SAFE_FREE(k);
2922         }
2923
2924         if (!tdb->travlocks.off) {
2925                 /* No previous element: do normal find, and lock record */
2926                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2927                 if (!tdb->travlocks.off)
2928                         return tdb_null;
2929                 tdb->travlocks.hash = BUCKET(rec.full_hash);
2930                 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2931                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2932                         return tdb_null;
2933                 }
2934         }
2935         oldhash = tdb->travlocks.hash;
2936
2937         /* Grab next record: locks chain and returned record,
2938            unlocks old record */
2939         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2940                 key.dsize = rec.key_len;
2941                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2942                                           key.dsize);
2943                 /* Unlock the chain of this new record */
2944                 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2945                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2946         }
2947         /* Unlock the chain of old record */
2948         if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2949                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2950         return key;
2951 }
2952
2953 /* file: dump.c */
2954
2955 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2956                                  tdb_off_t offset)
2957 {
2958         struct list_struct rec;
2959         tdb_off_t tailer_ofs, tailer;
2960
2961         if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2962                                    sizeof(rec), DOCONV()) == -1) {
2963                 printf("ERROR: failed to read record at %u\n", offset);
2964                 return 0;
2965         }
2966
2967         printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2968                "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2969                hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2970                rec.full_hash, rec.magic);
2971
2972         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2973
2974         if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2975                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2976                 return rec.next;
2977         }
2978
2979         if (tailer != rec.rec_len + sizeof(rec)) {
2980                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2981                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2982         }
2983         return rec.next;
2984 }
2985
2986 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2987 {
2988         tdb_off_t rec_ptr, top;
2989
2990         top = TDB_HASH_TOP(i);
2991
2992         if (tdb_lock(tdb, i, F_WRLCK) != 0)
2993                 return -1;
2994
2995         if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2996                 return tdb_unlock(tdb, i, F_WRLCK);
2997
2998         if (rec_ptr)
2999                 printf("hash=%d\n", i);
3000
3001         while (rec_ptr) {
3002                 rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
3003         }
3004
3005         return tdb_unlock(tdb, i, F_WRLCK);
3006 }
3007
3008 void tdb_dump_all(struct tdb_context *tdb)
3009 {
3010         int i;
3011         for (i=0;i<tdb->header.hash_size;i++) {
3012                 tdb_dump_chain(tdb, i);
3013         }
3014         printf("freelist:\n");
3015         tdb_dump_chain(tdb, -1);
3016 }
3017
3018 int tdb_printfreelist(struct tdb_context *tdb)
3019 {
3020         int ret;
3021         long total_free = 0;
3022         tdb_off_t offset, rec_ptr;
3023         struct list_struct rec;
3024
3025         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3026                 return ret;
3027
3028         offset = FREELIST_TOP;
3029
3030         /* read in the freelist top */
3031         if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3032                 tdb_unlock(tdb, -1, F_WRLCK);
3033                 return 0;
3034         }
3035
3036         printf("freelist top=[0x%08x]\n", rec_ptr );
3037         while (rec_ptr) {
3038                 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3039                                            sizeof(rec), DOCONV()) == -1) {
3040                         tdb_unlock(tdb, -1, F_WRLCK);
3041                         return -1;
3042                 }
3043
3044                 if (rec.magic != TDB_FREE_MAGIC) {
3045                         printf("bad magic 0x%08x in free list\n", rec.magic);
3046                         tdb_unlock(tdb, -1, F_WRLCK);
3047                         return -1;
3048                 }
3049
3050                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3051                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3052                 total_free += rec.rec_len;
3053
3054                 /* move to the next record */
3055                 rec_ptr = rec.next;
3056         }
3057         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3058                (int)total_free);
3059
3060         return tdb_unlock(tdb, -1, F_WRLCK);
3061 }
3062
3063 /* file: tdb.c */
3064
3065 /*
3066   non-blocking increment of the tdb sequence number if the tdb has been opened using
3067   the TDB_SEQNUM flag
3068 */
3069 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3070 {
3071         tdb_off_t seqnum=0;
3072
3073         if (!(tdb->flags & TDB_SEQNUM)) {
3074                 return;
3075         }
3076
3077         /* we ignore errors from this, as we have no sane way of
3078            dealing with them.
3079         */
3080         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3081         seqnum++;
3082         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3083 }
3084
3085 /*
3086   increment the tdb sequence number if the tdb has been opened using
3087   the TDB_SEQNUM flag
3088 */
3089 static void tdb_increment_seqnum(struct tdb_context *tdb)
3090 {
3091         if (!(tdb->flags & TDB_SEQNUM)) {
3092                 return;
3093         }
3094
3095         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3096                 return;
3097         }
3098
3099         tdb_increment_seqnum_nonblock(tdb);
3100
3101         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3102 }
3103
3104 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3105 {
3106         return memcmp(data.dptr, key.dptr, data.dsize);
3107 }
3108
3109 /* Returns 0 on fail.  On success, return offset of record, and fills
3110    in rec */
3111 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3112                         struct list_struct *r)
3113 {
3114         tdb_off_t rec_ptr;
3115
3116         /* read in the hash top */
3117         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3118                 return 0;
3119
3120         /* keep looking until we find the right record */
3121         while (rec_ptr) {
3122                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3123                         return 0;
3124
3125                 if (!TDB_DEAD(r) && hash==r->full_hash
3126                     && key.dsize==r->key_len
3127                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3128                                       r->key_len, tdb_key_compare,
3129                                       NULL) == 0) {
3130                         return rec_ptr;
3131                 }
3132                 rec_ptr = r->next;
3133         }
3134         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3135 }
3136
3137 /* As tdb_find, but if you succeed, keep the lock */
3138 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3139                            struct list_struct *rec)
3140 {
3141         u32 rec_ptr;
3142
3143         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3144                 return 0;
3145         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3146                 tdb_unlock(tdb, BUCKET(hash), locktype);
3147         return rec_ptr;
3148 }
3149
3150
3151 /* update an entry in place - this only works if the new data size
3152    is <= the old data size and the key exists.
3153    on failure return -1.
3154 */
3155 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3156 {
3157         struct list_struct rec;
3158         tdb_off_t rec_ptr;
3159
3160         /* find entry */
3161         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3162                 return -1;
3163
3164         /* must be long enough key, data and tailer */
3165         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3166                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3167                 return -1;
3168         }
3169
3170         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3171                       dbuf.dptr, dbuf.dsize) == -1)
3172                 return -1;
3173
3174         if (dbuf.dsize != rec.data_len) {
3175                 /* update size */
3176                 rec.data_len = dbuf.dsize;
3177                 return tdb_rec_write(tdb, rec_ptr, &rec);
3178         }
3179
3180         return 0;
3181 }
3182
3183 /* find an entry in the database given a key */
3184 /* If an entry doesn't exist tdb_err will be set to
3185  * TDB_ERR_NOEXIST. If a key has no data attached
3186  * then the TDB_DATA will have zero length but
3187  * a non-zero pointer
3188  */
3189 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3190 {
3191         tdb_off_t rec_ptr;
3192         struct list_struct rec;
3193         TDB_DATA ret;
3194         u32 hash;
3195
3196         /* find which hash bucket it is in */
3197         hash = tdb->hash_fn(&key);
3198         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3199                 return tdb_null;
3200
3201         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3202                                   rec.data_len);
3203         ret.dsize = rec.data_len;
3204         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3205         return ret;
3206 }
3207
3208 /*
3209  * Find an entry in the database and hand the record's data to a parsing
3210  * function. The parsing function is executed under the chain read lock, so it
3211  * should be fast and should not block on other syscalls.
3212  *
3213  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3214  *
3215  * For mmapped tdb's that do not have a transaction open it points the parsing
3216  * function directly at the mmap area, it avoids the malloc/memcpy in this
3217  * case. If a transaction is open or no mmap is available, it has to do
3218  * malloc/read/parse/free.
3219  *
3220  * This is interesting for all readers of potentially large data structures in
3221  * the tdb records, ldb indexes being one example.
3222  */
3223
3224 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3225                      int (*parser)(TDB_DATA key, TDB_DATA data,
3226                                    void *private_data),
3227                      void *private_data)
3228 {
3229         tdb_off_t rec_ptr;
3230         struct list_struct rec;
3231         int ret;
3232         u32 hash;
3233
3234         /* find which hash bucket it is in */
3235         hash = tdb->hash_fn(&key);
3236
3237         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3238                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3239         }
3240
3241         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3242                              rec.data_len, parser, private_data);
3243
3244         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3245
3246         return ret;
3247 }
3248
3249 /* check if an entry in the database exists
3250
3251    note that 1 is returned if the key is found and 0 is returned if not found
3252    this doesn't match the conventions in the rest of this module, but is
3253    compatible with gdbm
3254 */
3255 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3256 {
3257         struct list_struct rec;
3258
3259         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3260                 return 0;
3261         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3262         return 1;
3263 }
3264
3265 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3266 {
3267         u32 hash = tdb->hash_fn(&key);
3268         return tdb_exists_hash(tdb, key, hash);
3269 }
3270
3271 /* actually delete an entry in the database given the offset */
3272 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3273 {
3274         tdb_off_t last_ptr, i;
3275         struct list_struct lastrec;
3276
3277         if (tdb->read_only || tdb->traverse_read) return -1;
3278
3279         if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3280                 /* Someone traversing here: mark it as dead */
3281                 rec->magic = TDB_DEAD_MAGIC;
3282                 return tdb_rec_write(tdb, rec_ptr, rec);
3283         }
3284         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3285                 return -1;
3286
3287         /* find previous record in hash chain */
3288         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3289                 return -1;
3290         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3291                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3292                         return -1;
3293
3294         /* unlink it: next ptr is at start of record. */
3295         if (last_ptr == 0)
3296                 last_ptr = TDB_HASH_TOP(rec->full_hash);
3297         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3298                 return -1;
3299
3300         /* recover the space */
3301         if (tdb_free(tdb, rec_ptr, rec) == -1)
3302                 return -1;
3303         return 0;
3304 }
3305
3306 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3307 {
3308         int res = 0;
3309         tdb_off_t rec_ptr;
3310         struct list_struct rec;
3311
3312         /* read in the hash top */
3313         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3314                 return 0;
3315
3316         while (rec_ptr) {
3317                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3318                         return 0;
3319
3320                 if (rec.magic == TDB_DEAD_MAGIC) {
3321                         res += 1;
3322                 }
3323                 rec_ptr = rec.next;
3324         }
3325         return res;
3326 }
3327
3328 /*
3329  * Purge all DEAD records from a hash chain
3330  */
3331 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3332 {
3333         int res = -1;
3334         struct list_struct rec;
3335         tdb_off_t rec_ptr;
3336
3337         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3338                 return -1;
3339         }
3340
3341         /* read in the hash top */
3342         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3343                 goto fail;
3344
3345         while (rec_ptr) {
3346                 tdb_off_t next;
3347
3348                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3349                         goto fail;
3350                 }
3351
3352                 next = rec.next;
3353
3354                 if (rec.magic == TDB_DEAD_MAGIC
3355                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3356                         goto fail;
3357                 }
3358                 rec_ptr = next;
3359         }
3360         res = 0;
3361  fail:
3362         tdb_unlock(tdb, -1, F_WRLCK);
3363         return res;
3364 }
3365
3366 /* delete an entry in the database given a key */
3367 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3368 {
3369         tdb_off_t rec_ptr;
3370         struct list_struct rec;
3371         int ret;
3372
3373         if (tdb->max_dead_records != 0) {
3374
3375                 /*
3376                  * Allow for some dead records per hash chain, mainly for
3377                  * tdb's with a very high create/delete rate like locking.tdb.
3378                  */
3379
3380                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3381                         return -1;
3382
3383                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3384                         /*
3385                          * Don't let the per-chain freelist grow too large,
3386                          * delete all existing dead records
3387                          */
3388                         tdb_purge_dead(tdb, hash);
3389                 }
3390
3391                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3392                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3393                         return -1;
3394                 }
3395
3396                 /*
3397                  * Just mark the record as dead.
3398                  */
3399                 rec.magic = TDB_DEAD_MAGIC;
3400                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3401         }
3402         else {
3403                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3404                                                    &rec)))
3405                         return -1;
3406
3407                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3408         }
3409
3410         if (ret == 0) {
3411                 tdb_increment_seqnum(tdb);
3412         }
3413
3414         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3415                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3416         return ret;
3417 }
3418
3419 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3420 {
3421         u32 hash = tdb->hash_fn(&key);
3422         return tdb_delete_hash(tdb, key, hash);
3423 }
3424
3425 /*
3426  * See if we have a dead record around with enough space
3427  */
3428 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3429                                struct list_struct *r, tdb_len_t length)
3430 {
3431         tdb_off_t rec_ptr;
3432
3433         /* read in the hash top */
3434         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3435                 return 0;
3436
3437         /* keep looking until we find the right record */
3438         while (rec_ptr) {
3439                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3440                         return 0;
3441
3442                 if (TDB_DEAD(r) && r->rec_len >= length) {
3443                         /*
3444                          * First fit for simple coding, TODO: change to best
3445                          * fit
3446                          */
3447                         return rec_ptr;
3448                 }
3449                 rec_ptr = r->next;
3450         }
3451         return 0;
3452 }
3453
3454 /* store an element in the database, replacing any existing element
3455    with the same key
3456
3457    return 0 on success, -1 on failure
3458 */
3459 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3460 {
3461         struct list_struct rec;
3462         u32 hash;
3463         tdb_off_t rec_ptr;
3464         char *p = NULL;
3465         int ret = -1;
3466
3467         if (tdb->read_only || tdb->traverse_read) {
3468                 tdb->ecode = TDB_ERR_RDONLY;
3469                 return -1;
3470         }
3471
3472         /* find which hash bucket it is in */
3473         hash = tdb->hash_fn(&key);
3474         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3475                 return -1;
3476
3477         /* check for it existing, on insert. */
3478         if (flag == TDB_INSERT) {
3479                 if (tdb_exists_hash(tdb, key, hash)) {
3480                         tdb->ecode = TDB_ERR_EXISTS;
3481                         goto fail;
3482                 }
3483         } else {
3484                 /* first try in-place update, on modify or replace. */
3485                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3486                         goto done;
3487                 }
3488                 if (tdb->ecode == TDB_ERR_NOEXIST &&
3489                     flag == TDB_MODIFY) {
3490                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
3491                          we should fail the store */
3492                         goto fail;
3493                 }
3494         }
3495         /* reset the error code potentially set by the tdb_update() */
3496         tdb->ecode = TDB_SUCCESS;
3497
3498         /* delete any existing record - if it doesn't exist we don't
3499            care.  Doing this first reduces fragmentation, and avoids
3500            coalescing with `allocated' block before it's updated. */
3501         if (flag != TDB_INSERT)
3502                 tdb_delete_hash(tdb, key, hash);
3503
3504         /* Copy key+value *before* allocating free space in case malloc
3505            fails and we are left with a dead spot in the tdb. */
3506
3507         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3508                 tdb->ecode = TDB_ERR_OOM;
3509                 goto fail;
3510         }
3511
3512         memcpy(p, key.dptr, key.dsize);
3513         if (dbuf.dsize)
3514                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3515
3516         if (tdb->max_dead_records != 0) {
3517                 /*
3518                  * Allow for some dead records per hash chain, look if we can
3519                  * find one that can hold the new record. We need enough space
3520                  * for key, data and tailer. If we find one, we don't have to
3521                  * consult the central freelist.
3522                  */
3523                 rec_ptr = tdb_find_dead(
3524                         tdb, hash, &rec,
3525                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3526
3527                 if (rec_ptr != 0) {
3528                         rec.key_len = key.dsize;
3529                         rec.data_len = dbuf.dsize;
3530                         rec.full_hash = hash;
3531                         rec.magic = TDB_MAGIC;
3532                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3533                             || tdb->methods->tdb_write(
3534                                     tdb, rec_ptr + sizeof(rec),
3535                                     p, key.dsize + dbuf.dsize) == -1) {
3536                                 goto fail;
3537                         }
3538                         goto done;
3539                 }
3540         }
3541
3542         /*
3543          * We have to allocate some space from the freelist, so this means we
3544          * have to lock it. Use the chance to purge all the DEAD records from
3545          * the hash chain under the freelist lock.
3546          */
3547
3548         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3549                 goto fail;
3550         }
3551
3552         if ((tdb->max_dead_records != 0)
3553             && (tdb_purge_dead(tdb, hash) == -1)) {
3554                 tdb_unlock(tdb, -1, F_WRLCK);
3555                 goto fail;
3556         }
3557
3558         /* we have to allocate some space */
3559         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3560
3561         tdb_unlock(tdb, -1, F_WRLCK);
3562
3563         if (rec_ptr == 0) {
3564                 goto fail;
3565         }
3566
3567         /* Read hash top into next ptr */
3568         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3569                 goto fail;
3570
3571         rec.key_len = key.dsize;
3572         rec.data_len = dbuf.dsize;
3573         rec.full_hash = hash;
3574         rec.magic = TDB_MAGIC;
3575
3576         /* write out and point the top of the hash chain at it */
3577         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3578             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3579             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3580                 /* Need to tdb_unallocate() here */
3581                 goto fail;
3582         }
3583
3584  done:
3585         ret = 0;
3586  fail:
3587         if (ret == 0) {
3588                 tdb_increment_seqnum(tdb);
3589         }
3590
3591         SAFE_FREE(p);
3592         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3593         return ret;
3594 }
3595
3596
3597 /* Append to an entry. Create if not exist. */
3598 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3599 {
3600         u32 hash;
3601         TDB_DATA dbuf;
3602         int ret = -1;
3603
3604         /* find which hash bucket it is in */
3605         hash = tdb->hash_fn(&key);
3606         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3607                 return -1;
3608
3609         dbuf = tdb_fetch(tdb, key);
3610
3611         if (dbuf.dptr == NULL) {
3612                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3613         } else {
3614                 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3615                                                      dbuf.dsize + new_dbuf.dsize);
3616                 if (new_dptr == NULL) {
3617                         free(dbuf.dptr);
3618                 }
3619                 dbuf.dptr = new_dptr;
3620         }
3621
3622         if (dbuf.dptr == NULL) {
3623                 tdb->ecode = TDB_ERR_OOM;
3624                 goto failed;
3625         }
3626
3627         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3628         dbuf.dsize += new_dbuf.dsize;
3629
3630         ret = tdb_store(tdb, key, dbuf, 0);
3631
3632 failed:
3633         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3634         SAFE_FREE(dbuf.dptr);
3635         return ret;
3636 }
3637
3638
3639 /*
3640   return the name of the current tdb file
3641   useful for external logging functions
3642 */
3643 const char *tdb_name(struct tdb_context *tdb)
3644 {
3645         return tdb->name;
3646 }
3647
3648 /*
3649   return the underlying file descriptor being used by tdb, or -1
3650   useful for external routines that want to check the device/inode
3651   of the fd
3652 */
3653 int tdb_fd(struct tdb_context *tdb)
3654 {
3655         return tdb->fd;
3656 }
3657
3658 /*
3659   return the current logging function
3660   useful for external tdb routines that wish to log tdb errors
3661 */
3662 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3663 {
3664         return tdb->log.log_fn;
3665 }
3666
3667
3668 /*
3669   get the tdb sequence number. Only makes sense if the writers opened
3670   with TDB_SEQNUM set. Note that this sequence number will wrap quite
3671   quickly, so it should only be used for a 'has something changed'
3672   test, not for code that relies on the count of the number of changes
3673   made. If you want a counter then use a tdb record.
3674
3675   The aim of this sequence number is to allow for a very lightweight
3676   test of a possible tdb change.
3677 */
3678 int tdb_get_seqnum(struct tdb_context *tdb)
3679 {
3680         tdb_off_t seqnum=0;
3681
3682         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3683         return seqnum;
3684 }
3685
3686 int tdb_hash_size(struct tdb_context *tdb)
3687 {
3688         return tdb->header.hash_size;
3689 }
3690
3691 size_t tdb_map_size(struct tdb_context *tdb)
3692 {
3693         return tdb->map_size;
3694 }
3695
3696 int tdb_get_flags(struct tdb_context *tdb)
3697 {
3698         return tdb->flags;
3699 }
3700
3701
3702 /*
3703   enable sequence number handling on an open tdb
3704 */
3705 void tdb_enable_seqnum(struct tdb_context *tdb)
3706 {
3707         tdb->flags |= TDB_SEQNUM;
3708 }
3709
3710 /* file: open.c */
3711
3712 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3713 static struct tdb_context *tdbs = NULL;
3714
3715
3716 /* This is from a hash algorithm suggested by Rogier Wolff */
3717 static unsigned int default_tdb_hash(TDB_DATA *key)
3718 {
3719         u32 value;      /* Used to compute the hash value.  */
3720         u32   i;        /* Used to cycle through random values. */
3721
3722         /* Set the initial value from the key size. */
3723         for (value = 0, i=0; i < key->dsize; i++)
3724                 value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3725
3726         return value;
3727 }
3728
3729
3730 /* initialise a new database with a specified hash size */
3731 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3732 {
3733         struct tdb_header *newdb;
3734         int size, ret = -1;
3735
3736         /* We make it up in memory, then write it out if not internal */
3737         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3738         if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3739                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3740
3741         /* Fill in the header */
3742         newdb->version = TDB_VERSION;
3743         newdb->hash_size = hash_size;
3744         if (tdb->flags & TDB_INTERNAL) {
3745                 tdb->map_size = size;
3746                 tdb->map_ptr = (char *)newdb;
3747                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3748                 /* Convert the `ondisk' version if asked. */
3749                 CONVERT(*newdb);
3750                 return 0;
3751         }
3752         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3753                 goto fail;
3754
3755         if (ftruncate(tdb->fd, 0) == -1)
3756                 goto fail;
3757
3758         /* This creates an endian-converted header, as if read from disk */
3759         CONVERT(*newdb);
3760         memcpy(&tdb->header, newdb, sizeof(tdb->header));
3761         /* Don't endian-convert the magic food! */
3762         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3763         if (write(tdb->fd, newdb, size) != size) {
3764                 ret = -1;
3765         } else {
3766                 ret = 0;
3767         }
3768
3769   fail:
3770         SAFE_FREE(newdb);
3771         return ret;
3772 }
3773
3774
3775
3776 static int tdb_already_open(dev_t device,
3777                             ino_t ino)
3778 {
3779         struct tdb_context *i;
3780
3781         for (i = tdbs; i; i = i->next) {
3782                 if (i->device == device && i->inode == ino) {
3783                         return 1;
3784                 }
3785         }
3786
3787         return 0;
3788 }
3789
3790 /* open the database, creating it if necessary
3791
3792    The open_flags and mode are passed straight to the open call on the
3793    database file. A flags value of O_WRONLY is invalid. The hash size
3794    is advisory, use zero for a default value.
3795
3796    Return is NULL on error, in which case errno is also set.  Don't
3797    try to call tdb_error or tdb_errname, just do strerror(errno).
3798
3799    @param name may be NULL for internal databases. */
3800 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3801                       int open_flags, mode_t mode)
3802 {
3803         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3804 }
3805
3806 /* a default logging function */
3807 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3808 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3809 {
3810 }
3811
3812
3813 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3814                                 int open_flags, mode_t mode,
3815                                 const struct tdb_logging_context *log_ctx,
3816                                 tdb_hash_func hash_fn)
3817 {
3818         struct tdb_context *tdb;
3819         struct stat st;
3820         int rev = 0, locked = 0;
3821         unsigned char *vp;
3822         u32 vertest;
3823
3824         if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3825                 /* Can't log this */
3826                 errno = ENOMEM;
3827                 goto fail;
3828         }
3829         tdb_io_init(tdb);
3830         tdb->fd = -1;
3831         tdb->name = NULL;
3832         tdb->map_ptr = NULL;
3833         tdb->flags = tdb_flags;
3834         tdb->open_flags = open_flags;
3835         if (log_ctx) {
3836                 tdb->log = *log_ctx;
3837         } else {
3838                 tdb->log.log_fn = null_log_fn;
3839                 tdb->log.log_private = NULL;
3840         }
3841         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3842
3843         /* cache the page size */
3844         tdb->page_size = sysconf(_SC_PAGESIZE);
3845         if (tdb->page_size <= 0) {
3846                 tdb->page_size = 0x2000;
3847         }
3848
3849         if ((open_flags & O_ACCMODE) == O_WRONLY) {
3850                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3851                          name));
3852                 errno = EINVAL;
3853                 goto fail;
3854         }
3855
3856         if (hash_size == 0)
3857                 hash_size = DEFAULT_HASH_SIZE;
3858         if ((open_flags & O_ACCMODE) == O_RDONLY) {
3859                 tdb->read_only = 1;
3860                 /* read only databases don't do locking or clear if first */
3861                 tdb->flags |= TDB_NOLOCK;
3862                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3863         }
3864
3865         /* internal databases don't mmap or lock, and start off cleared */
3866         if (tdb->flags & TDB_INTERNAL) {
3867                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3868                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3869                 if (tdb_new_database(tdb, hash_size) != 0) {
3870                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3871                         goto fail;
3872                 }
3873                 goto internal;
3874         }
3875
3876         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3877                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3878                          name, strerror(errno)));
3879                 goto fail;      /* errno set by open(2) */
3880         }
3881
3882         /* ensure there is only one process initialising at once */
3883         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3884                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3885                          name, strerror(errno)));
3886                 goto fail;      /* errno set by tdb_brlock */
3887         }
3888
3889         /* we need to zero database if we are the only one with it open */
3890         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3891             (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3892                 open_flags |= O_CREAT;
3893                 if (ftruncate(tdb->fd, 0) == -1) {
3894                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3895                                  "failed to truncate %s: %s\n",
3896                                  name, strerror(errno)));
3897                         goto fail; /* errno set by ftruncate */
3898                 }
3899         }
3900
3901         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3902             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3903             || (tdb->header.version != TDB_VERSION
3904                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3905                 /* its not a valid database - possibly initialise it */
3906                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3907                         errno = EIO; /* ie bad format or something */
3908                         goto fail;
3909                 }
3910                 rev = (tdb->flags & TDB_CONVERT);
3911         }
3912         vp = (unsigned char *)&tdb->header.version;
3913         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3914                   (((u32)vp[2]) << 8) | (u32)vp[3];
3915         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3916         if (!rev)
3917                 tdb->flags &= ~TDB_CONVERT;
3918         else {
3919                 tdb->flags |= TDB_CONVERT;
3920                 tdb_convert(&tdb->header, sizeof(tdb->header));
3921         }
3922         if (fstat(tdb->fd, &st) == -1)
3923                 goto fail;
3924
3925         if (tdb->header.rwlocks != 0) {
3926                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3927                 goto fail;
3928         }
3929
3930         /* Is it already in the open list?  If so, fail. */
3931         if (tdb_already_open(st.st_dev, st.st_ino)) {
3932                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3933                          "%s (%d,%d) is already open in this process\n",
3934                          name, (int)st.st_dev, (int)st.st_ino));
3935                 errno = EBUSY;
3936                 goto fail;
3937         }
3938
3939         if (!(tdb->name = (char *)strdup(name))) {
3940                 errno = ENOMEM;
3941                 goto fail;
3942         }
3943
3944         tdb->map_size = st.st_size;
3945         tdb->device = st.st_dev;
3946         tdb->inode = st.st_ino;
3947         tdb->max_dead_records = 0;
3948         tdb_mmap(tdb);
3949         if (locked) {
3950                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3951                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3952                                  "failed to take ACTIVE_LOCK on %s: %s\n",
3953                                  name, strerror(errno)));
3954                         goto fail;
3955                 }
3956
3957         }
3958
3959         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3960            we didn't get the initial exclusive lock as we need to let all other
3961            users know we're using it. */
3962
3963         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3964                 /* leave this lock in place to indicate it's in use */
3965                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3966                         goto fail;
3967         }
3968
3969         /* if needed, run recovery */
3970         if (tdb_transaction_recover(tdb) == -1) {
3971                 goto fail;
3972         }
3973
3974  internal:
3975         /* Internal (memory-only) databases skip all the code above to
3976          * do with disk files, and resume here by releasing their
3977          * global lock and hooking into the active list. */
3978         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3979                 goto fail;
3980         tdb->next = tdbs;
3981         tdbs = tdb;
3982         return tdb;
3983
3984  fail:
3985         { int save_errno = errno;
3986
3987         if (!tdb)
3988                 return NULL;
3989
3990         if (tdb->map_ptr) {
3991                 if (tdb->flags & TDB_INTERNAL)
3992                         SAFE_FREE(tdb->map_ptr);
3993                 else
3994                         tdb_munmap(tdb);
3995         }
3996         SAFE_FREE(tdb->name);
3997         if (tdb->fd != -1)
3998                 if (close(tdb->fd) != 0)
3999                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
4000         SAFE_FREE(tdb);
4001         errno = save_errno;
4002         return NULL;
4003         }
4004 }
4005
4006 /*
4007  * Set the maximum number of dead records per hash chain
4008  */
4009
4010 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4011 {
4012         tdb->max_dead_records = max_dead;
4013 }
4014
4015 /**
4016  * Close a database.
4017  *
4018  * @returns -1 for error; 0 for success.
4019  **/
4020 int tdb_close(struct tdb_context *tdb)
4021 {
4022         struct tdb_context **i;
4023         int ret = 0;
4024
4025         if (tdb->transaction) {
4026                 tdb_transaction_cancel(tdb);
4027         }
4028
4029         if (tdb->map_ptr) {
4030                 if (tdb->flags & TDB_INTERNAL)
4031                         SAFE_FREE(tdb->map_ptr);
4032                 else
4033                         tdb_munmap(tdb);
4034         }
4035         SAFE_FREE(tdb->name);
4036         if (tdb->fd != -1)
4037                 ret = close(tdb->fd);
4038         SAFE_FREE(tdb->lockrecs);
4039
4040         /* Remove from contexts list */
4041         for (i = &tdbs; *i; i = &(*i)->next) {
4042                 if (*i == tdb) {
4043                         *i = tdb->next;
4044                         break;
4045                 }
4046         }
4047
4048         memset(tdb, 0, sizeof(*tdb));
4049         SAFE_FREE(tdb);
4050
4051         return ret;
4052 }
4053
4054 /* register a loging function */
4055 void tdb_set_logging_function(struct tdb_context *tdb,
4056                               const struct tdb_logging_context *log_ctx)
4057 {
4058         tdb->log = *log_ctx;
4059 }
4060
4061 void *tdb_get_logging_private(struct tdb_context *tdb)
4062 {
4063         return tdb->log.log_private;
4064 }
4065
4066 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
4067    seek pointer from our parent and to re-establish locks */
4068 int tdb_reopen(struct tdb_context *tdb)
4069 {
4070         struct stat st;
4071
4072         if (tdb->flags & TDB_INTERNAL) {
4073                 return 0; /* Nothing to do. */
4074         }
4075
4076         if (tdb->num_locks != 0 || tdb->global_lock.count) {
4077                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4078                 goto fail;
4079         }
4080
4081         if (tdb->transaction != 0) {
4082                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4083                 goto fail;
4084         }
4085
4086         if (tdb_munmap(tdb) != 0) {
4087                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4088                 goto fail;
4089         }
4090         if (close(tdb->fd) != 0)
4091                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4092         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4093         if (tdb->fd == -1) {
4094                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4095                 goto fail;
4096         }
4097         if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4098             (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4099                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4100                 goto fail;
4101         }
4102         if (fstat(tdb->fd, &st) != 0) {
4103                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4104                 goto fail;
4105         }
4106         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4107                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4108                 goto fail;
4109         }
4110         tdb_mmap(tdb);
4111
4112         return 0;
4113
4114 fail:
4115         tdb_close(tdb);
4116         return -1;
4117 }
4118
4119 /* reopen all tdb's */
4120 int tdb_reopen_all(int parent_longlived)
4121 {
4122         struct tdb_context *tdb;
4123
4124         for (tdb=tdbs; tdb; tdb = tdb->next) {
4125                 /*
4126                  * If the parent is longlived (ie. a
4127                  * parent daemon architecture), we know
4128                  * it will keep it's active lock on a
4129                  * tdb opened with CLEAR_IF_FIRST. Thus
4130                  * for child processes we don't have to
4131                  * add an active lock. This is essential
4132                  * to improve performance on systems that
4133                  * keep POSIX locks as a non-scalable data
4134                  * structure in the kernel.
4135                  */
4136                 if (parent_longlived) {
4137                         /* Ensure no clear-if-first. */
4138                         tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4139                 }
4140
4141                 if (tdb_reopen(tdb) != 0)
4142                         return -1;
4143         }
4144
4145         return 0;
4146 }
4147
4148 /**
4149  * Flush a database file from the page cache.
4150  **/
4151 int tdb_flush(struct tdb_context *tdb)
4152 {
4153         if (tdb->fd != -1)
4154                 return fsync(tdb->fd);
4155         return 0;
4156 }