Whamcloud - gitweb
libext2fs: document EXT2_FLAG_64BITS in ext2fs_open2()
[tools/e2fsprogs.git] / lib / ext2fs / tdb.c
1 /*
2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3 Rev: 23590
4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5 */
6  /*
7    trivial database library - standalone version
8
9    Copyright (C) Andrew Tridgell              1999-2005
10    Copyright (C) Jeremy Allison               2000-2006
11    Copyright (C) Paul `Rusty' Russell         2000
12
13      ** NOTE! The following LGPL license applies to the tdb
14      ** library. This does NOT imply that all of Samba is released
15      ** under the LGPL
16
17    This library is free software; you can redistribute it and/or
18    modify it under the terms of the GNU Lesser General Public
19    License as published by the Free Software Foundation; either
20    version 2 of the License, or (at your option) any later version.
21
22    This library is distributed in the hope that it will be useful,
23    but WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25    Lesser General Public License for more details.
26
27    You should have received a copy of the GNU Lesser General Public
28    License along with this library; if not, write to the Free Software
29    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30 */
31
32 #ifdef CONFIG_STAND_ALONE
33 #define HAVE_MMAP
34 #define HAVE_STRDUP
35 #define HAVE_SYS_MMAN_H
36 #define HAVE_UTIME_H
37 #define HAVE_UTIME
38 #endif
39 #define _XOPEN_SOURCE 600
40
41 #include "config.h"
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <stdarg.h>
46 #include <stddef.h>
47 #include <errno.h>
48 #include <string.h>
49 #ifdef HAVE_SYS_SELECT_H
50 #include <sys/select.h>
51 #endif
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <time.h>
55 #ifdef HAVE_UTIME_H
56 #include <utime.h>
57 #endif
58 #include <sys/stat.h>
59 #include <sys/file.h>
60 #include <fcntl.h>
61
62 #ifdef HAVE_SYS_MMAN_H
63 #include <sys/mman.h>
64 #endif
65
66 #ifndef MAP_FILE
67 #define MAP_FILE 0
68 #endif
69
70 #ifndef MAP_FAILED
71 #define MAP_FAILED ((void *)-1)
72 #endif
73
74 #ifndef HAVE_STRDUP
75 #define strdup rep_strdup
76 static char *rep_strdup(const char *s)
77 {
78         char *ret;
79         int length;
80         if (!s)
81                 return NULL;
82
83         if (!length)
84                 length = strlen(s);
85
86         ret = malloc(length + 1);
87         if (ret) {
88                 strncpy(ret, s, length);
89                 ret[length] = '\0';
90         }
91         return ret;
92 }
93 #endif
94
95 #ifndef PRINTF_ATTRIBUTE
96 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
97 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
98  * the parameter containing the format, and a2 the index of the first
99  * argument. Note that some gcc 2.x versions don't handle this
100  * properly **/
101 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
102 #else
103 #define PRINTF_ATTRIBUTE(a1, a2)
104 #endif
105 #endif
106
107 typedef int bool;
108
109 #include "tdb.h"
110
111 static TDB_DATA tdb_null;
112
113 #ifndef u32
114 #define u32 unsigned
115 #endif
116
117 typedef u32 tdb_len_t;
118 typedef u32 tdb_off_t;
119
120 #ifndef offsetof
121 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
122 #endif
123
124 #define TDB_MAGIC_FOOD "TDB file\n"
125 #define TDB_VERSION (0x26011967 + 6)
126 #define TDB_MAGIC (0x26011999U)
127 #define TDB_FREE_MAGIC (~TDB_MAGIC)
128 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
129 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
130 #define TDB_ALIGNMENT 4
131 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
132 #define DEFAULT_HASH_SIZE 131
133 #define FREELIST_TOP (sizeof(struct tdb_header))
134 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
135 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
136 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
137 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
138 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
139 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
140 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
141 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
142 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
143 #define TDB_PAD_BYTE 0x42
144 #define TDB_PAD_U32  0x42424242
145
146 /* NB assumes there is a local variable called "tdb" that is the
147  * current context, also takes doubly-parenthesized print-style
148  * argument. */
149 #define TDB_LOG(x) tdb->log.log_fn x
150
151 /* lock offsets */
152 #define GLOBAL_LOCK      0
153 #define ACTIVE_LOCK      4
154 #define TRANSACTION_LOCK 8
155
156 /* free memory if the pointer is valid and zero the pointer */
157 #ifndef SAFE_FREE
158 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
159 #endif
160
161 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
162
163 #define DOCONV() (tdb->flags & TDB_CONVERT)
164 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
165
166
167 /* the body of the database is made of one list_struct for the free space
168    plus a separate data list for each hash value */
169 struct list_struct {
170         tdb_off_t next; /* offset of the next record in the list */
171         tdb_len_t rec_len; /* total byte length of record */
172         tdb_len_t key_len; /* byte length of key */
173         tdb_len_t data_len; /* byte length of data */
174         u32 full_hash; /* the full 32 bit hash of the key */
175         u32 magic;   /* try to catch errors */
176         /* the following union is implied:
177                 union {
178                         char record[rec_len];
179                         struct {
180                                 char key[key_len];
181                                 char data[data_len];
182                         }
183                         u32 totalsize; (tailer)
184                 }
185         */
186 };
187
188
189 /* this is stored at the front of every database */
190 struct tdb_header {
191         char magic_food[32]; /* for /etc/magic */
192         u32 version; /* version of the code */
193         u32 hash_size; /* number of hash entries */
194         tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
195         tdb_off_t recovery_start; /* offset of transaction recovery region */
196         tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
197         tdb_off_t reserved[29];
198 };
199
200 struct tdb_lock_type {
201         int list;
202         u32 count;
203         u32 ltype;
204 };
205
206 struct tdb_traverse_lock {
207         struct tdb_traverse_lock *next;
208         u32 off;
209         u32 hash;
210         int lock_rw;
211 };
212
213
214 struct tdb_methods {
215         int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
216         int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
217         void (*next_hash_chain)(struct tdb_context *, u32 *);
218         int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
219         int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
220         int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
221 };
222
223 struct tdb_context {
224         char *name; /* the name of the database */
225         void *map_ptr; /* where it is currently mapped */
226         int fd; /* open file descriptor for the database */
227         tdb_len_t map_size; /* how much space has been mapped */
228         int read_only; /* opened read-only */
229         int traverse_read; /* read-only traversal */
230         struct tdb_lock_type global_lock;
231         int num_lockrecs;
232         struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
233         enum TDB_ERROR ecode; /* error code for last tdb error */
234         struct tdb_header header; /* a cached copy of the header */
235         u32 flags; /* the flags passed to tdb_open */
236         struct tdb_traverse_lock travlocks; /* current traversal locks */
237         struct tdb_context *next; /* all tdbs to avoid multiple opens */
238         dev_t device;   /* uniquely identifies this tdb */
239         ino_t inode;    /* uniquely identifies this tdb */
240         struct tdb_logging_context log;
241         unsigned int (*hash_fn)(TDB_DATA *key);
242         int open_flags; /* flags used in the open - needed by reopen */
243         unsigned int num_locks; /* number of chain locks held */
244         const struct tdb_methods *methods;
245         struct tdb_transaction *transaction;
246         int page_size;
247         int max_dead_records;
248         bool have_transaction_lock;
249 };
250
251
252 /*
253   internal prototypes
254 */
255 static int tdb_munmap(struct tdb_context *tdb);
256 static void tdb_mmap(struct tdb_context *tdb);
257 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
258 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
259 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
260 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
261 static int tdb_transaction_unlock(struct tdb_context *tdb);
262 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
263 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
264 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
265 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
266 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267 static void *tdb_convert(void *buf, u32 size);
268 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
269 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
270 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
271 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
272 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
273 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
274 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
275 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
276 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
277 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
278 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
279                    tdb_off_t offset, tdb_len_t len,
280                    int (*parser)(TDB_DATA key, TDB_DATA data,
281                                  void *private_data),
282                    void *private_data);
283 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
284                            struct list_struct *rec);
285 static void tdb_io_init(struct tdb_context *tdb);
286 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
287 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
288                       struct list_struct *rec);
289
290
291 /* file: error.c */
292
293 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
294 {
295         return tdb->ecode;
296 }
297
298 static struct tdb_errname {
299         enum TDB_ERROR ecode; const char *estring;
300 } emap[] = { {TDB_SUCCESS, "Success"},
301              {TDB_ERR_CORRUPT, "Corrupt database"},
302              {TDB_ERR_IO, "IO Error"},
303              {TDB_ERR_LOCK, "Locking error"},
304              {TDB_ERR_OOM, "Out of memory"},
305              {TDB_ERR_EXISTS, "Record exists"},
306              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
307              {TDB_ERR_EINVAL, "Invalid parameter"},
308              {TDB_ERR_NOEXIST, "Record does not exist"},
309              {TDB_ERR_RDONLY, "write not permitted"} };
310
311 /* Error string for the last tdb error */
312 const char *tdb_errorstr(struct tdb_context *tdb)
313 {
314         u32 i;
315         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
316                 if (tdb->ecode == emap[i].ecode)
317                         return emap[i].estring;
318         return "Invalid error code";
319 }
320
321 /* file: lock.c */
322
323 #define TDB_MARK_LOCK 0x80000000
324
325 /* a byte range locking function - return 0 on success
326    this functions locks/unlocks 1 byte at the specified offset.
327
328    On error, errno is also set so that errors are passed back properly
329    through tdb_open().
330
331    note that a len of zero means lock to end of file
332 */
333 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
334                int rw_type, int lck_type, int probe, size_t len)
335 {
336         struct flock fl;
337         int ret;
338
339         if (tdb->flags & TDB_NOLOCK) {
340                 return 0;
341         }
342
343         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
344                 tdb->ecode = TDB_ERR_RDONLY;
345                 return -1;
346         }
347
348         fl.l_type = rw_type;
349         fl.l_whence = SEEK_SET;
350         fl.l_start = offset;
351         fl.l_len = len;
352         fl.l_pid = 0;
353
354         do {
355                 ret = fcntl(tdb->fd,lck_type,&fl);
356         } while (ret == -1 && errno == EINTR);
357
358         if (ret == -1) {
359                 /* Generic lock error. errno set by fcntl.
360                  * EAGAIN is an expected return from non-blocking
361                  * locks. */
362                 if (!probe && lck_type != F_SETLK) {
363                         /* Ensure error code is set for log fun to examine. */
364                         tdb->ecode = TDB_ERR_LOCK;
365                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
366                                  tdb->fd, offset, rw_type, lck_type, (int)len));
367                 }
368                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
369         }
370         return 0;
371 }
372
373
374 /*
375   upgrade a read lock to a write lock. This needs to be handled in a
376   special way as some OSes (such as solaris) have too conservative
377   deadlock detection and claim a deadlock when progress can be
378   made. For those OSes we may loop for a while.
379 */
380 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
381 {
382         int count = 1000;
383         while (count--) {
384                 struct timeval tv;
385                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
386                         return 0;
387                 }
388                 if (errno != EDEADLK) {
389                         break;
390                 }
391                 /* sleep for as short a time as we can - more portable than usleep() */
392                 tv.tv_sec = 0;
393                 tv.tv_usec = 1;
394                 select(0, NULL, NULL, NULL, &tv);
395         }
396         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
397         return -1;
398 }
399
400
401 /* lock a list in the database. list -1 is the alloc list */
402 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
403 {
404         struct tdb_lock_type *new_lck;
405         int i;
406         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
407
408         ltype &= ~TDB_MARK_LOCK;
409
410         /* a global lock allows us to avoid per chain locks */
411         if (tdb->global_lock.count &&
412             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
413                 return 0;
414         }
415
416         if (tdb->global_lock.count) {
417                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
418         }
419
420         if (list < -1 || list >= (int)tdb->header.hash_size) {
421                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
422                            list, ltype));
423                 return -1;
424         }
425         if (tdb->flags & TDB_NOLOCK)
426                 return 0;
427
428         for (i=0; i<tdb->num_lockrecs; i++) {
429                 if (tdb->lockrecs[i].list == list) {
430                         if (tdb->lockrecs[i].count == 0) {
431                                 /*
432                                  * Can't happen, see tdb_unlock(). It should
433                                  * be an assert.
434                                  */
435                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
436                                          "lck->count == 0 for list %d", list));
437                         }
438                         /*
439                          * Just increment the in-memory struct, posix locks
440                          * don't stack.
441                          */
442                         tdb->lockrecs[i].count++;
443                         return 0;
444                 }
445         }
446
447         new_lck = (struct tdb_lock_type *)realloc(
448                 tdb->lockrecs,
449                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
450         if (new_lck == NULL) {
451                 errno = ENOMEM;
452                 return -1;
453         }
454         tdb->lockrecs = new_lck;
455
456         /* Since fcntl locks don't nest, we do a lock for the first one,
457            and simply bump the count for future ones */
458         if (!mark_lock &&
459             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
460                                      0, 1)) {
461                 return -1;
462         }
463
464         tdb->num_locks++;
465
466         tdb->lockrecs[tdb->num_lockrecs].list = list;
467         tdb->lockrecs[tdb->num_lockrecs].count = 1;
468         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
469         tdb->num_lockrecs += 1;
470
471         return 0;
472 }
473
474 /* lock a list in the database. list -1 is the alloc list */
475 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
476 {
477         int ret;
478         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
479         if (ret) {
480                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
481                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
482         }
483         return ret;
484 }
485
486 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
487 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
488 {
489         return _tdb_lock(tdb, list, ltype, F_SETLK);
490 }
491
492
493 /* unlock the database: returns void because it's too late for errors. */
494         /* changed to return int it may be interesting to know there
495            has been an error  --simo */
496 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
497 {
498         int ret = -1;
499         int i;
500         struct tdb_lock_type *lck = NULL;
501         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
502
503         ltype &= ~TDB_MARK_LOCK;
504
505         /* a global lock allows us to avoid per chain locks */
506         if (tdb->global_lock.count &&
507             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
508                 return 0;
509         }
510
511         if (tdb->global_lock.count) {
512                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
513         }
514
515         if (tdb->flags & TDB_NOLOCK)
516                 return 0;
517
518         /* Sanity checks */
519         if (list < -1 || list >= (int)tdb->header.hash_size) {
520                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
521                 return ret;
522         }
523
524         for (i=0; i<tdb->num_lockrecs; i++) {
525                 if (tdb->lockrecs[i].list == list) {
526                         lck = &tdb->lockrecs[i];
527                         break;
528                 }
529         }
530
531         if ((lck == NULL) || (lck->count == 0)) {
532                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
533                 return -1;
534         }
535
536         if (lck->count > 1) {
537                 lck->count--;
538                 return 0;
539         }
540
541         /*
542          * This lock has count==1 left, so we need to unlock it in the
543          * kernel. We don't bother with decrementing the in-memory array
544          * element, we're about to overwrite it with the last array element
545          * anyway.
546          */
547
548         if (mark_lock) {
549                 ret = 0;
550         } else {
551                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
552                                                F_SETLKW, 0, 1);
553         }
554         tdb->num_locks--;
555
556         /*
557          * Shrink the array by overwriting the element just unlocked with the
558          * last array element.
559          */
560
561         if (tdb->num_lockrecs > 1) {
562                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
563         }
564         tdb->num_lockrecs -= 1;
565
566         /*
567          * We don't bother with realloc when the array shrinks, but if we have
568          * a completely idle tdb we should get rid of the locked array.
569          */
570
571         if (tdb->num_lockrecs == 0) {
572                 SAFE_FREE(tdb->lockrecs);
573         }
574
575         if (ret)
576                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
577         return ret;
578 }
579
580 /*
581   get the transaction lock
582  */
583 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
584 {
585         if (tdb->have_transaction_lock || tdb->global_lock.count) {
586                 return 0;
587         }
588         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
589                                      F_SETLKW, 0, 1) == -1) {
590                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
591                 tdb->ecode = TDB_ERR_LOCK;
592                 return -1;
593         }
594         tdb->have_transaction_lock = 1;
595         return 0;
596 }
597
598 /*
599   release the transaction lock
600  */
601 int tdb_transaction_unlock(struct tdb_context *tdb)
602 {
603         int ret;
604         if (!tdb->have_transaction_lock) {
605                 return 0;
606         }
607         ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
608         if (ret == 0) {
609                 tdb->have_transaction_lock = 0;
610         }
611         return ret;
612 }
613
614
615
616
617 /* lock/unlock entire database */
618 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
619 {
620         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
621
622         ltype &= ~TDB_MARK_LOCK;
623
624         /* There are no locks on read-only dbs */
625         if (tdb->read_only || tdb->traverse_read)
626                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
627
628         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
629                 tdb->global_lock.count++;
630                 return 0;
631         }
632
633         if (tdb->global_lock.count) {
634                 /* a global lock of a different type exists */
635                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
636         }
637
638         if (tdb->num_locks != 0) {
639                 /* can't combine global and chain locks */
640                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
641         }
642
643         if (!mark_lock &&
644             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
645                                      0, 4*tdb->header.hash_size)) {
646                 if (op == F_SETLKW) {
647                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
648                 }
649                 return -1;
650         }
651
652         tdb->global_lock.count = 1;
653         tdb->global_lock.ltype = ltype;
654
655         return 0;
656 }
657
658
659
660 /* unlock entire db */
661 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
662 {
663         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
664
665         ltype &= ~TDB_MARK_LOCK;
666
667         /* There are no locks on read-only dbs */
668         if (tdb->read_only || tdb->traverse_read) {
669                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
670         }
671
672         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
673                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
674         }
675
676         if (tdb->global_lock.count > 1) {
677                 tdb->global_lock.count--;
678                 return 0;
679         }
680
681         if (!mark_lock &&
682             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
683                                      0, 4*tdb->header.hash_size)) {
684                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
685                 return -1;
686         }
687
688         tdb->global_lock.count = 0;
689         tdb->global_lock.ltype = 0;
690
691         return 0;
692 }
693
694 /* lock entire database with write lock */
695 int tdb_lockall(struct tdb_context *tdb)
696 {
697         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
698 }
699
700 /* lock entire database with write lock - mark only */
701 int tdb_lockall_mark(struct tdb_context *tdb)
702 {
703         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
704 }
705
706 /* unlock entire database with write lock - unmark only */
707 int tdb_lockall_unmark(struct tdb_context *tdb)
708 {
709         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
710 }
711
712 /* lock entire database with write lock - nonblocking varient */
713 int tdb_lockall_nonblock(struct tdb_context *tdb)
714 {
715         return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
716 }
717
718 /* unlock entire database with write lock */
719 int tdb_unlockall(struct tdb_context *tdb)
720 {
721         return _tdb_unlockall(tdb, F_WRLCK);
722 }
723
724 /* lock entire database with read lock */
725 int tdb_lockall_read(struct tdb_context *tdb)
726 {
727         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
728 }
729
730 /* lock entire database with read lock - nonblock varient */
731 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
732 {
733         return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
734 }
735
736 /* unlock entire database with read lock */
737 int tdb_unlockall_read(struct tdb_context *tdb)
738 {
739         return _tdb_unlockall(tdb, F_RDLCK);
740 }
741
742 /* lock/unlock one hash chain. This is meant to be used to reduce
743    contention - it cannot guarantee how many records will be locked */
744 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
745 {
746         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
747 }
748
749 /* lock/unlock one hash chain, non-blocking. This is meant to be used
750    to reduce contention - it cannot guarantee how many records will be
751    locked */
752 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
753 {
754         return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
755 }
756
757 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
758 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
759 {
760         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
761 }
762
763 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
764 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
765 {
766         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
767 }
768
769 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
770 {
771         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
772 }
773
774 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
775 {
776         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
777 }
778
779 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
780 {
781         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
782 }
783
784
785
786 /* record lock stops delete underneath */
787 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
788 {
789         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
790 }
791
792 /*
793   Write locks override our own fcntl readlocks, so check it here.
794   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
795   an error to fail to get the lock here.
796 */
797 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
798 {
799         struct tdb_traverse_lock *i;
800         for (i = &tdb->travlocks; i; i = i->next)
801                 if (i->off == off)
802                         return -1;
803         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
804 }
805
806 /*
807   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
808   an error to fail to get the lock here.
809 */
810 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
811 {
812         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
813 }
814
815 /* fcntl locks don't stack: avoid unlocking someone else's */
816 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
817 {
818         struct tdb_traverse_lock *i;
819         u32 count = 0;
820
821         if (off == 0)
822                 return 0;
823         for (i = &tdb->travlocks; i; i = i->next)
824                 if (i->off == off)
825                         count++;
826         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
827 }
828
829 /* file: io.c */
830
831 /* check for an out of bounds access - if it is out of bounds then
832    see if the database has been expanded by someone else and expand
833    if necessary
834    note that "len" is the minimum length needed for the db
835 */
836 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
837 {
838         struct stat st;
839         if (len <= tdb->map_size)
840                 return 0;
841         if (tdb->flags & TDB_INTERNAL) {
842                 if (!probe) {
843                         /* Ensure ecode is set for log fn. */
844                         tdb->ecode = TDB_ERR_IO;
845                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
846                                  (int)len, (int)tdb->map_size));
847                 }
848                 return TDB_ERRCODE(TDB_ERR_IO, -1);
849         }
850
851         if (fstat(tdb->fd, &st) == -1) {
852                 return TDB_ERRCODE(TDB_ERR_IO, -1);
853         }
854
855         if (st.st_size < (size_t)len) {
856                 if (!probe) {
857                         /* Ensure ecode is set for log fn. */
858                         tdb->ecode = TDB_ERR_IO;
859                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
860                                  (int)len, (int)st.st_size));
861                 }
862                 return TDB_ERRCODE(TDB_ERR_IO, -1);
863         }
864
865         /* Unmap, update size, remap */
866         if (tdb_munmap(tdb) == -1)
867                 return TDB_ERRCODE(TDB_ERR_IO, -1);
868         tdb->map_size = st.st_size;
869         tdb_mmap(tdb);
870         return 0;
871 }
872
873 /* write a lump of data at a specified offset */
874 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
875                      const void *buf, tdb_len_t len)
876 {
877         if (len == 0) {
878                 return 0;
879         }
880
881         if (tdb->read_only || tdb->traverse_read) {
882                 tdb->ecode = TDB_ERR_RDONLY;
883                 return -1;
884         }
885
886         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
887                 return -1;
888
889         if (tdb->map_ptr) {
890                 memcpy(off + (char *)tdb->map_ptr, buf, len);
891         } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
892                 /* Ensure ecode is set for log fn. */
893                 tdb->ecode = TDB_ERR_IO;
894                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
895                            off, len, strerror(errno)));
896                 return TDB_ERRCODE(TDB_ERR_IO, -1);
897         }
898         return 0;
899 }
900
901 /* Endian conversion: we only ever deal with 4 byte quantities */
902 void *tdb_convert(void *buf, u32 size)
903 {
904         u32 i, *p = (u32 *)buf;
905         for (i = 0; i < size / 4; i++)
906                 p[i] = TDB_BYTEREV(p[i]);
907         return buf;
908 }
909
910
911 /* read a lump of data at a specified offset, maybe convert */
912 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
913                     tdb_len_t len, int cv)
914 {
915         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
916                 return -1;
917         }
918
919         if (tdb->map_ptr) {
920                 memcpy(buf, off + (char *)tdb->map_ptr, len);
921         } else {
922                 ssize_t ret = pread(tdb->fd, buf, len, off);
923                 if (ret != (ssize_t)len) {
924                         /* Ensure ecode is set for log fn. */
925                         tdb->ecode = TDB_ERR_IO;
926                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
927                                  "len=%d ret=%d (%s) map_size=%d\n",
928                                  (int)off, (int)len, (int)ret, strerror(errno),
929                                  (int)tdb->map_size));
930                         return TDB_ERRCODE(TDB_ERR_IO, -1);
931                 }
932         }
933         if (cv) {
934                 tdb_convert(buf, len);
935         }
936         return 0;
937 }
938
939
940
941 /*
942   do an unlocked scan of the hash table heads to find the next non-zero head. The value
943   will then be confirmed with the lock held
944 */
945 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
946 {
947         u32 h = *chain;
948         if (tdb->map_ptr) {
949                 for (;h < tdb->header.hash_size;h++) {
950                         if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
951                                 break;
952                         }
953                 }
954         } else {
955                 u32 off=0;
956                 for (;h < tdb->header.hash_size;h++) {
957                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
958                                 break;
959                         }
960                 }
961         }
962         (*chain) = h;
963 }
964
965
966 int tdb_munmap(struct tdb_context *tdb)
967 {
968         if (tdb->flags & TDB_INTERNAL)
969                 return 0;
970
971 #ifdef HAVE_MMAP
972         if (tdb->map_ptr) {
973                 int ret = munmap(tdb->map_ptr, tdb->map_size);
974                 if (ret != 0)
975                         return ret;
976         }
977 #endif
978         tdb->map_ptr = NULL;
979         return 0;
980 }
981
982 void tdb_mmap(struct tdb_context *tdb)
983 {
984         if (tdb->flags & TDB_INTERNAL)
985                 return;
986
987 #ifdef HAVE_MMAP
988         if (!(tdb->flags & TDB_NOMMAP)) {
989                 tdb->map_ptr = mmap(NULL, tdb->map_size,
990                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE),
991                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
992
993                 /*
994                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
995                  */
996
997                 if (tdb->map_ptr == MAP_FAILED) {
998                         tdb->map_ptr = NULL;
999                         TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1000                                  tdb->map_size, strerror(errno)));
1001                 }
1002         } else {
1003                 tdb->map_ptr = NULL;
1004         }
1005 #else
1006         tdb->map_ptr = NULL;
1007 #endif
1008 }
1009
1010 /* expand a file.  we prefer to use ftruncate, as that is what posix
1011   says to use for mmap expansion */
1012 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1013 {
1014         char buf[1024];
1015
1016         if (tdb->read_only || tdb->traverse_read) {
1017                 tdb->ecode = TDB_ERR_RDONLY;
1018                 return -1;
1019         }
1020
1021         if (ftruncate(tdb->fd, size+addition) == -1) {
1022                 char b = 0;
1023                 if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1024                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1025                                  size+addition, strerror(errno)));
1026                         return -1;
1027                 }
1028         }
1029
1030         /* now fill the file with something. This ensures that the
1031            file isn't sparse, which would be very bad if we ran out of
1032            disk. This must be done with write, not via mmap */
1033         memset(buf, TDB_PAD_BYTE, sizeof(buf));
1034         while (addition) {
1035                 int n = addition>sizeof(buf)?sizeof(buf):addition;
1036                 int ret = pwrite(tdb->fd, buf, n, size);
1037                 if (ret != n) {
1038                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1039                                    n, strerror(errno)));
1040                         return -1;
1041                 }
1042                 addition -= n;
1043                 size += n;
1044         }
1045         return 0;
1046 }
1047
1048
1049 /* expand the database at least size bytes by expanding the underlying
1050    file and doing the mmap again if necessary */
1051 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1052 {
1053         struct list_struct rec;
1054         tdb_off_t offset;
1055
1056         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1057                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1058                 return -1;
1059         }
1060
1061         /* must know about any previous expansions by another process */
1062         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1063
1064         /* always make room for at least 10 more records, and round
1065            the database up to a multiple of the page size */
1066         size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1067
1068         if (!(tdb->flags & TDB_INTERNAL))
1069                 tdb_munmap(tdb);
1070
1071         /*
1072          * We must ensure the file is unmapped before doing this
1073          * to ensure consistency with systems like OpenBSD where
1074          * writes and mmaps are not consistent.
1075          */
1076
1077         /* expand the file itself */
1078         if (!(tdb->flags & TDB_INTERNAL)) {
1079                 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1080                         goto fail;
1081         }
1082
1083         tdb->map_size += size;
1084
1085         if (tdb->flags & TDB_INTERNAL) {
1086                 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1087                                                     tdb->map_size);
1088                 if (!new_map_ptr) {
1089                         tdb->map_size -= size;
1090                         goto fail;
1091                 }
1092                 tdb->map_ptr = new_map_ptr;
1093         } else {
1094                 /*
1095                  * We must ensure the file is remapped before adding the space
1096                  * to ensure consistency with systems like OpenBSD where
1097                  * writes and mmaps are not consistent.
1098                  */
1099
1100                 /* We're ok if the mmap fails as we'll fallback to read/write */
1101                 tdb_mmap(tdb);
1102         }
1103
1104         /* form a new freelist record */
1105         memset(&rec,'\0',sizeof(rec));
1106         rec.rec_len = size - sizeof(rec);
1107
1108         /* link it into the free list */
1109         offset = tdb->map_size - size;
1110         if (tdb_free(tdb, offset, &rec) == -1)
1111                 goto fail;
1112
1113         tdb_unlock(tdb, -1, F_WRLCK);
1114         return 0;
1115  fail:
1116         tdb_unlock(tdb, -1, F_WRLCK);
1117         return -1;
1118 }
1119
1120 /* read/write a tdb_off_t */
1121 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1122 {
1123         return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1124 }
1125
1126 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1127 {
1128         tdb_off_t off = *d;
1129         return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1130 }
1131
1132
1133 /* read a lump of data, allocating the space for it */
1134 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1135 {
1136         unsigned char *buf;
1137
1138         /* some systems don't like zero length malloc */
1139         if (len == 0) {
1140                 len = 1;
1141         }
1142
1143         if (!(buf = (unsigned char *)malloc(len))) {
1144                 /* Ensure ecode is set for log fn. */
1145                 tdb->ecode = TDB_ERR_OOM;
1146                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1147                            len, strerror(errno)));
1148                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1149         }
1150         if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1151                 SAFE_FREE(buf);
1152                 return NULL;
1153         }
1154         return buf;
1155 }
1156
1157 /* Give a piece of tdb data to a parser */
1158
1159 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1160                    tdb_off_t offset, tdb_len_t len,
1161                    int (*parser)(TDB_DATA key, TDB_DATA data,
1162                                  void *private_data),
1163                    void *private_data)
1164 {
1165         TDB_DATA data;
1166         int result;
1167
1168         data.dsize = len;
1169
1170         if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1171                 /*
1172                  * Optimize by avoiding the malloc/memcpy/free, point the
1173                  * parser directly at the mmap area.
1174                  */
1175                 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1176                         return -1;
1177                 }
1178                 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1179                 return parser(key, data, private_data);
1180         }
1181
1182         if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1183                 return -1;
1184         }
1185
1186         result = parser(key, data, private_data);
1187         free(data.dptr);
1188         return result;
1189 }
1190
1191 /* read/write a record */
1192 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1193 {
1194         if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1195                 return -1;
1196         if (TDB_BAD_MAGIC(rec)) {
1197                 /* Ensure ecode is set for log fn. */
1198                 tdb->ecode = TDB_ERR_CORRUPT;
1199                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1200                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1201         }
1202         return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1203 }
1204
1205 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1206 {
1207         struct list_struct r = *rec;
1208         return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1209 }
1210
1211 static const struct tdb_methods io_methods = {
1212         tdb_read,
1213         tdb_write,
1214         tdb_next_hash_chain,
1215         tdb_oob,
1216         tdb_expand_file,
1217         tdb_brlock
1218 };
1219
1220 /*
1221   initialise the default methods table
1222 */
1223 void tdb_io_init(struct tdb_context *tdb)
1224 {
1225         tdb->methods = &io_methods;
1226 }
1227
1228 /* file: transaction.c */
1229
1230 /*
1231   transaction design:
1232
1233   - only allow a single transaction at a time per database. This makes
1234     using the transaction API simpler, as otherwise the caller would
1235     have to cope with temporary failures in transactions that conflict
1236     with other current transactions
1237
1238   - keep the transaction recovery information in the same file as the
1239     database, using a special 'transaction recovery' record pointed at
1240     by the header. This removes the need for extra journal files as
1241     used by some other databases
1242
1243   - dynamically allocated the transaction recover record, re-using it
1244     for subsequent transactions. If a larger record is needed then
1245     tdb_free() the old record to place it on the normal tdb freelist
1246     before allocating the new record
1247
1248   - during transactions, keep a linked list of writes all that have
1249     been performed by intercepting all tdb_write() calls. The hooked
1250     transaction versions of tdb_read() and tdb_write() check this
1251     linked list and try to use the elements of the list in preference
1252     to the real database.
1253
1254   - don't allow any locks to be held when a transaction starts,
1255     otherwise we can end up with deadlock (plus lack of lock nesting
1256     in posix locks would mean the lock is lost)
1257
1258   - if the caller gains a lock during the transaction but doesn't
1259     release it then fail the commit
1260
1261   - allow for nested calls to tdb_transaction_start(), re-using the
1262     existing transaction record. If the inner transaction is cancelled
1263     then a subsequent commit will fail
1264
1265   - keep a mirrored copy of the tdb hash chain heads to allow for the
1266     fast hash heads scan on traverse, updating the mirrored copy in
1267     the transaction version of tdb_write
1268
1269   - allow callers to mix transaction and non-transaction use of tdb,
1270     although once a transaction is started then an exclusive lock is
1271     gained until the transaction is committed or cancelled
1272
1273   - the commit stategy involves first saving away all modified data
1274     into a linearised buffer in the transaction recovery area, then
1275     marking the transaction recovery area with a magic value to
1276     indicate a valid recovery record. In total 4 fsync/msync calls are
1277     needed per commit to prevent race conditions. It might be possible
1278     to reduce this to 3 or even 2 with some more work.
1279
1280   - check for a valid recovery record on open of the tdb, while the
1281     global lock is held. Automatically recover from the transaction
1282     recovery area if needed, then continue with the open as
1283     usual. This allows for smooth crash recovery with no administrator
1284     intervention.
1285
1286   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1287     still available, but no transaction recovery area is used and no
1288     fsync/msync calls are made.
1289
1290 */
1291
1292 struct tdb_transaction_el {
1293         struct tdb_transaction_el *next, *prev;
1294         tdb_off_t offset;
1295         tdb_len_t length;
1296         unsigned char *data;
1297 };
1298
1299 /*
1300   hold the context of any current transaction
1301 */
1302 struct tdb_transaction {
1303         /* we keep a mirrored copy of the tdb hash heads here so
1304            tdb_next_hash_chain() can operate efficiently */
1305         u32 *hash_heads;
1306
1307         /* the original io methods - used to do IOs to the real db */
1308         const struct tdb_methods *io_methods;
1309
1310         /* the list of transaction elements. We use a doubly linked
1311            list with a last pointer to allow us to keep the list
1312            ordered, with first element at the front of the list. It
1313            needs to be doubly linked as the read/write traversals need
1314            to be backwards, while the commit needs to be forwards */
1315         struct tdb_transaction_el *elements, *elements_last;
1316
1317         /* non-zero when an internal transaction error has
1318            occurred. All write operations will then fail until the
1319            transaction is ended */
1320         int transaction_error;
1321
1322         /* when inside a transaction we need to keep track of any
1323            nested tdb_transaction_start() calls, as these are allowed,
1324            but don't create a new transaction */
1325         int nesting;
1326
1327         /* old file size before transaction */
1328         tdb_len_t old_map_size;
1329 };
1330
1331
1332 /*
1333   read while in a transaction. We need to check first if the data is in our list
1334   of transaction elements, then if not do a real read
1335 */
1336 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1337                             tdb_len_t len, int cv)
1338 {
1339         struct tdb_transaction_el *el;
1340
1341         /* we need to walk the list backwards to get the most recent data */
1342         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1343                 tdb_len_t partial;
1344
1345                 if (off+len <= el->offset) {
1346                         continue;
1347                 }
1348                 if (off >= el->offset + el->length) {
1349                         continue;
1350                 }
1351
1352                 /* an overlapping read - needs to be split into up to
1353                    2 reads and a memcpy */
1354                 if (off < el->offset) {
1355                         partial = el->offset - off;
1356                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1357                                 goto fail;
1358                         }
1359                         len -= partial;
1360                         off += partial;
1361                         buf = (void *)(partial + (char *)buf);
1362                 }
1363                 if (off + len <= el->offset + el->length) {
1364                         partial = len;
1365                 } else {
1366                         partial = el->offset + el->length - off;
1367                 }
1368                 memcpy(buf, el->data + (off - el->offset), partial);
1369                 if (cv) {
1370                         tdb_convert(buf, len);
1371                 }
1372                 len -= partial;
1373                 off += partial;
1374                 buf = (void *)(partial + (char *)buf);
1375
1376                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1377                         goto fail;
1378                 }
1379
1380                 return 0;
1381         }
1382
1383         /* its not in the transaction elements - do a real read */
1384         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1385
1386 fail:
1387         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1388         tdb->ecode = TDB_ERR_IO;
1389         tdb->transaction->transaction_error = 1;
1390         return -1;
1391 }
1392
1393
1394 /*
1395   write while in a transaction
1396 */
1397 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1398                              const void *buf, tdb_len_t len)
1399 {
1400         struct tdb_transaction_el *el, *best_el=NULL;
1401
1402         if (len == 0) {
1403                 return 0;
1404         }
1405
1406         /* if the write is to a hash head, then update the transaction
1407            hash heads */
1408         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1409             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1410                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1411                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1412         }
1413
1414         /* first see if we can replace an existing entry */
1415         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1416                 tdb_len_t partial;
1417
1418                 if (best_el == NULL && off == el->offset+el->length) {
1419                         best_el = el;
1420                 }
1421
1422                 if (off+len <= el->offset) {
1423                         continue;
1424                 }
1425                 if (off >= el->offset + el->length) {
1426                         continue;
1427                 }
1428
1429                 /* an overlapping write - needs to be split into up to
1430                    2 writes and a memcpy */
1431                 if (off < el->offset) {
1432                         partial = el->offset - off;
1433                         if (transaction_write(tdb, off, buf, partial) != 0) {
1434                                 goto fail;
1435                         }
1436                         len -= partial;
1437                         off += partial;
1438                         buf = (const void *)(partial + (const char *)buf);
1439                 }
1440                 if (off + len <= el->offset + el->length) {
1441                         partial = len;
1442                 } else {
1443                         partial = el->offset + el->length - off;
1444                 }
1445                 memcpy(el->data + (off - el->offset), buf, partial);
1446                 len -= partial;
1447                 off += partial;
1448                 buf = (const void *)(partial + (const char *)buf);
1449
1450                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1451                         goto fail;
1452                 }
1453
1454                 return 0;
1455         }
1456
1457         /* see if we can append the new entry to an existing entry */
1458         if (best_el && best_el->offset + best_el->length == off &&
1459             (off+len < tdb->transaction->old_map_size ||
1460              off > tdb->transaction->old_map_size)) {
1461                 unsigned char *data = best_el->data;
1462                 el = best_el;
1463                 el->data = (unsigned char *)realloc(el->data,
1464                                                     el->length + len);
1465                 if (el->data == NULL) {
1466                         tdb->ecode = TDB_ERR_OOM;
1467                         tdb->transaction->transaction_error = 1;
1468                         el->data = data;
1469                         return -1;
1470                 }
1471                 if (buf) {
1472                         memcpy(el->data + el->length, buf, len);
1473                 } else {
1474                         memset(el->data + el->length, TDB_PAD_BYTE, len);
1475                 }
1476                 el->length += len;
1477                 return 0;
1478         }
1479
1480         /* add a new entry at the end of the list */
1481         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1482         if (el == NULL) {
1483                 tdb->ecode = TDB_ERR_OOM;
1484                 tdb->transaction->transaction_error = 1;
1485                 return -1;
1486         }
1487         el->next = NULL;
1488         el->prev = tdb->transaction->elements_last;
1489         el->offset = off;
1490         el->length = len;
1491         el->data = (unsigned char *)malloc(len);
1492         if (el->data == NULL) {
1493                 free(el);
1494                 tdb->ecode = TDB_ERR_OOM;
1495                 tdb->transaction->transaction_error = 1;
1496                 return -1;
1497         }
1498         if (buf) {
1499                 memcpy(el->data, buf, len);
1500         } else {
1501                 memset(el->data, TDB_PAD_BYTE, len);
1502         }
1503         if (el->prev) {
1504                 el->prev->next = el;
1505         } else {
1506                 tdb->transaction->elements = el;
1507         }
1508         tdb->transaction->elements_last = el;
1509         return 0;
1510
1511 fail:
1512         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1513         tdb->ecode = TDB_ERR_IO;
1514         tdb->transaction->transaction_error = 1;
1515         return -1;
1516 }
1517
1518 /*
1519   accelerated hash chain head search, using the cached hash heads
1520 */
1521 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1522 {
1523         u32 h = *chain;
1524         for (;h < tdb->header.hash_size;h++) {
1525                 /* the +1 takes account of the freelist */
1526                 if (0 != tdb->transaction->hash_heads[h+1]) {
1527                         break;
1528                 }
1529         }
1530         (*chain) = h;
1531 }
1532
1533 /*
1534   out of bounds check during a transaction
1535 */
1536 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1537 {
1538         if (len <= tdb->map_size) {
1539                 return 0;
1540         }
1541         return TDB_ERRCODE(TDB_ERR_IO, -1);
1542 }
1543
1544 /*
1545   transaction version of tdb_expand().
1546 */
1547 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1548                                    tdb_off_t addition)
1549 {
1550         /* add a write to the transaction elements, so subsequent
1551            reads see the zero data */
1552         if (transaction_write(tdb, size, NULL, addition) != 0) {
1553                 return -1;
1554         }
1555
1556         return 0;
1557 }
1558
1559 /*
1560   brlock during a transaction - ignore them
1561 */
1562 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1563                               int rw_type, int lck_type, int probe, size_t len)
1564 {
1565         return 0;
1566 }
1567
1568 static const struct tdb_methods transaction_methods = {
1569         transaction_read,
1570         transaction_write,
1571         transaction_next_hash_chain,
1572         transaction_oob,
1573         transaction_expand_file,
1574         transaction_brlock
1575 };
1576
1577
1578 /*
1579   start a tdb transaction. No token is returned, as only a single
1580   transaction is allowed to be pending per tdb_context
1581 */
1582 int tdb_transaction_start(struct tdb_context *tdb)
1583 {
1584         /* some sanity checks */
1585         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1586                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1587                 tdb->ecode = TDB_ERR_EINVAL;
1588                 return -1;
1589         }
1590
1591         /* cope with nested tdb_transaction_start() calls */
1592         if (tdb->transaction != NULL) {
1593                 tdb->transaction->nesting++;
1594                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1595                          tdb->transaction->nesting));
1596                 return 0;
1597         }
1598
1599         if (tdb->num_locks != 0 || tdb->global_lock.count) {
1600                 /* the caller must not have any locks when starting a
1601                    transaction as otherwise we'll be screwed by lack
1602                    of nested locks in posix */
1603                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1604                 tdb->ecode = TDB_ERR_LOCK;
1605                 return -1;
1606         }
1607
1608         if (tdb->travlocks.next != NULL) {
1609                 /* you cannot use transactions inside a traverse (although you can use
1610                    traverse inside a transaction) as otherwise you can end up with
1611                    deadlock */
1612                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1613                 tdb->ecode = TDB_ERR_LOCK;
1614                 return -1;
1615         }
1616
1617         tdb->transaction = (struct tdb_transaction *)
1618                 calloc(sizeof(struct tdb_transaction), 1);
1619         if (tdb->transaction == NULL) {
1620                 tdb->ecode = TDB_ERR_OOM;
1621                 return -1;
1622         }
1623
1624         /* get the transaction write lock. This is a blocking lock. As
1625            discussed with Volker, there are a number of ways we could
1626            make this async, which we will probably do in the future */
1627         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1628                 SAFE_FREE(tdb->transaction);
1629                 return -1;
1630         }
1631
1632         /* get a read lock from the freelist to the end of file. This
1633            is upgraded to a write lock during the commit */
1634         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1635                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1636                 tdb->ecode = TDB_ERR_LOCK;
1637                 goto fail;
1638         }
1639
1640         /* setup a copy of the hash table heads so the hash scan in
1641            traverse can be fast */
1642         tdb->transaction->hash_heads = (u32 *)
1643                 calloc(tdb->header.hash_size+1, sizeof(u32));
1644         if (tdb->transaction->hash_heads == NULL) {
1645                 tdb->ecode = TDB_ERR_OOM;
1646                 goto fail;
1647         }
1648         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1649                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1650                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1651                 tdb->ecode = TDB_ERR_IO;
1652                 goto fail;
1653         }
1654
1655         /* make sure we know about any file expansions already done by
1656            anyone else */
1657         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1658         tdb->transaction->old_map_size = tdb->map_size;
1659
1660         /* finally hook the io methods, replacing them with
1661            transaction specific methods */
1662         tdb->transaction->io_methods = tdb->methods;
1663         tdb->methods = &transaction_methods;
1664
1665         /* by calling this transaction write here, we ensure that we don't grow the
1666            transaction linked list due to hash table updates */
1667         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1668                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
1669                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1670                 tdb->ecode = TDB_ERR_IO;
1671                 tdb->methods = tdb->transaction->io_methods;
1672                 goto fail;
1673         }
1674
1675         return 0;
1676
1677 fail:
1678         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1679         tdb_transaction_unlock(tdb);
1680         SAFE_FREE(tdb->transaction->hash_heads);
1681         SAFE_FREE(tdb->transaction);
1682         return -1;
1683 }
1684
1685
1686 /*
1687   cancel the current transaction
1688 */
1689 int tdb_transaction_cancel(struct tdb_context *tdb)
1690 {
1691         if (tdb->transaction == NULL) {
1692                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1693                 return -1;
1694         }
1695
1696         if (tdb->transaction->nesting != 0) {
1697                 tdb->transaction->transaction_error = 1;
1698                 tdb->transaction->nesting--;
1699                 return 0;
1700         }
1701
1702         tdb->map_size = tdb->transaction->old_map_size;
1703
1704         /* free all the transaction elements */
1705         while (tdb->transaction->elements) {
1706                 struct tdb_transaction_el *el = tdb->transaction->elements;
1707                 tdb->transaction->elements = el->next;
1708                 free(el->data);
1709                 free(el);
1710         }
1711
1712         /* remove any global lock created during the transaction */
1713         if (tdb->global_lock.count != 0) {
1714                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1715                 tdb->global_lock.count = 0;
1716         }
1717
1718         /* remove any locks created during the transaction */
1719         if (tdb->num_locks != 0) {
1720                 int i;
1721                 for (i=0;i<tdb->num_lockrecs;i++) {
1722                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1723                                    F_UNLCK,F_SETLKW, 0, 1);
1724                 }
1725                 tdb->num_locks = 0;
1726                 tdb->num_lockrecs = 0;
1727                 SAFE_FREE(tdb->lockrecs);
1728         }
1729
1730         /* restore the normal io methods */
1731         tdb->methods = tdb->transaction->io_methods;
1732
1733         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1734         tdb_transaction_unlock(tdb);
1735         SAFE_FREE(tdb->transaction->hash_heads);
1736         SAFE_FREE(tdb->transaction);
1737
1738         return 0;
1739 }
1740
1741 /*
1742   sync to disk
1743 */
1744 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1745 {
1746         if (fsync(tdb->fd) != 0) {
1747                 tdb->ecode = TDB_ERR_IO;
1748                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1749                 return -1;
1750         }
1751 #if defined(HAVE_MSYNC) && defined(MS_SYNC)
1752         if (tdb->map_ptr) {
1753                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1754                 if (msync(moffset + (char *)tdb->map_ptr,
1755                           length + (offset - moffset), MS_SYNC) != 0) {
1756                         tdb->ecode = TDB_ERR_IO;
1757                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1758                                  strerror(errno)));
1759                         return -1;
1760                 }
1761         }
1762 #endif
1763         return 0;
1764 }
1765
1766
1767 /*
1768   work out how much space the linearised recovery data will consume
1769 */
1770 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1771 {
1772         struct tdb_transaction_el *el;
1773         tdb_len_t recovery_size = 0;
1774
1775         recovery_size = sizeof(u32);
1776         for (el=tdb->transaction->elements;el;el=el->next) {
1777                 if (el->offset >= tdb->transaction->old_map_size) {
1778                         continue;
1779                 }
1780                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1781         }
1782
1783         return recovery_size;
1784 }
1785
1786 /*
1787   allocate the recovery area, or use an existing recovery area if it is
1788   large enough
1789 */
1790 static int tdb_recovery_allocate(struct tdb_context *tdb,
1791                                  tdb_len_t *recovery_size,
1792                                  tdb_off_t *recovery_offset,
1793                                  tdb_len_t *recovery_max_size)
1794 {
1795         struct list_struct rec;
1796         const struct tdb_methods *methods = tdb->transaction->io_methods;
1797         tdb_off_t recovery_head;
1798
1799         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1800                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1801                 return -1;
1802         }
1803
1804         rec.rec_len = 0;
1805
1806         if (recovery_head != 0 &&
1807             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1808                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1809                 return -1;
1810         }
1811
1812         *recovery_size = tdb_recovery_size(tdb);
1813
1814         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1815                 /* it fits in the existing area */
1816                 *recovery_max_size = rec.rec_len;
1817                 *recovery_offset = recovery_head;
1818                 return 0;
1819         }
1820
1821         /* we need to free up the old recovery area, then allocate a
1822            new one at the end of the file. Note that we cannot use
1823            tdb_allocate() to allocate the new one as that might return
1824            us an area that is being currently used (as of the start of
1825            the transaction) */
1826         if (recovery_head != 0) {
1827                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1828                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1829                         return -1;
1830                 }
1831         }
1832
1833         /* the tdb_free() call might have increased the recovery size */
1834         *recovery_size = tdb_recovery_size(tdb);
1835
1836         /* round up to a multiple of page size */
1837         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1838         *recovery_offset = tdb->map_size;
1839         recovery_head = *recovery_offset;
1840
1841         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1842                                      (tdb->map_size - tdb->transaction->old_map_size) +
1843                                      sizeof(rec) + *recovery_max_size) == -1) {
1844                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1845                 return -1;
1846         }
1847
1848         /* remap the file (if using mmap) */
1849         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1850
1851         /* we have to reset the old map size so that we don't try to expand the file
1852            again in the transaction commit, which would destroy the recovery area */
1853         tdb->transaction->old_map_size = tdb->map_size;
1854
1855         /* write the recovery header offset and sync - we can sync without a race here
1856            as the magic ptr in the recovery record has not been set */
1857         CONVERT(recovery_head);
1858         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1859                                &recovery_head, sizeof(tdb_off_t)) == -1) {
1860                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1861                 return -1;
1862         }
1863
1864         return 0;
1865 }
1866
1867
1868 /*
1869   setup the recovery data that will be used on a crash during commit
1870 */
1871 static int transaction_setup_recovery(struct tdb_context *tdb,
1872                                       tdb_off_t *magic_offset)
1873 {
1874         struct tdb_transaction_el *el;
1875         tdb_len_t recovery_size;
1876         unsigned char *data, *p;
1877         const struct tdb_methods *methods = tdb->transaction->io_methods;
1878         struct list_struct *rec;
1879         tdb_off_t recovery_offset, recovery_max_size;
1880         tdb_off_t old_map_size = tdb->transaction->old_map_size;
1881         u32 magic, tailer;
1882
1883         /*
1884           check that the recovery area has enough space
1885         */
1886         if (tdb_recovery_allocate(tdb, &recovery_size,
1887                                   &recovery_offset, &recovery_max_size) == -1) {
1888                 return -1;
1889         }
1890
1891         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1892         if (data == NULL) {
1893                 tdb->ecode = TDB_ERR_OOM;
1894                 return -1;
1895         }
1896
1897         rec = (struct list_struct *)data;
1898         memset(rec, 0, sizeof(*rec));
1899
1900         rec->magic    = 0;
1901         rec->data_len = recovery_size;
1902         rec->rec_len  = recovery_max_size;
1903         rec->key_len  = old_map_size;
1904         CONVERT(rec);
1905
1906         /* build the recovery data into a single blob to allow us to do a single
1907            large write, which should be more efficient */
1908         p = data + sizeof(*rec);
1909         for (el=tdb->transaction->elements;el;el=el->next) {
1910                 if (el->offset >= old_map_size) {
1911                         continue;
1912                 }
1913                 if (el->offset + el->length > tdb->transaction->old_map_size) {
1914                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1915                         free(data);
1916                         tdb->ecode = TDB_ERR_CORRUPT;
1917                         return -1;
1918                 }
1919                 memcpy(p, &el->offset, 4);
1920                 memcpy(p+4, &el->length, 4);
1921                 if (DOCONV()) {
1922                         tdb_convert(p, 8);
1923                 }
1924                 /* the recovery area contains the old data, not the
1925                    new data, so we have to call the original tdb_read
1926                    method to get it */
1927                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1928                         free(data);
1929                         tdb->ecode = TDB_ERR_IO;
1930                         return -1;
1931                 }
1932                 p += 8 + el->length;
1933         }
1934
1935         /* and the tailer */
1936         tailer = sizeof(*rec) + recovery_max_size;
1937         memcpy(p, &tailer, 4);
1938         CONVERT(p);
1939
1940         /* write the recovery data to the recovery area */
1941         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1942                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1943                 free(data);
1944                 tdb->ecode = TDB_ERR_IO;
1945                 return -1;
1946         }
1947
1948         /* as we don't have ordered writes, we have to sync the recovery
1949            data before we update the magic to indicate that the recovery
1950            data is present */
1951         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1952                 free(data);
1953                 return -1;
1954         }
1955
1956         free(data);
1957
1958         magic = TDB_RECOVERY_MAGIC;
1959         CONVERT(magic);
1960
1961         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1962
1963         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1964                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1965                 tdb->ecode = TDB_ERR_IO;
1966                 return -1;
1967         }
1968
1969         /* ensure the recovery magic marker is on disk */
1970         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1971                 return -1;
1972         }
1973
1974         return 0;
1975 }
1976
1977 /*
1978   commit the current transaction
1979 */
1980 int tdb_transaction_commit(struct tdb_context *tdb)
1981 {
1982         const struct tdb_methods *methods;
1983         tdb_off_t magic_offset = 0;
1984         u32 zero = 0;
1985
1986         if (tdb->transaction == NULL) {
1987                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1988                 return -1;
1989         }
1990
1991         if (tdb->transaction->transaction_error) {
1992                 tdb->ecode = TDB_ERR_IO;
1993                 tdb_transaction_cancel(tdb);
1994                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1995                 return -1;
1996         }
1997
1998         if (tdb->transaction->nesting != 0) {
1999                 tdb->transaction->nesting--;
2000                 return 0;
2001         }
2002
2003         /* check for a null transaction */
2004         if (tdb->transaction->elements == NULL) {
2005                 tdb_transaction_cancel(tdb);
2006                 return 0;
2007         }
2008
2009         methods = tdb->transaction->io_methods;
2010
2011         /* if there are any locks pending then the caller has not
2012            nested their locks properly, so fail the transaction */
2013         if (tdb->num_locks || tdb->global_lock.count) {
2014                 tdb->ecode = TDB_ERR_LOCK;
2015                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2016                 tdb_transaction_cancel(tdb);
2017                 return -1;
2018         }
2019
2020         /* upgrade the main transaction lock region to a write lock */
2021         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2022                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2023                 tdb->ecode = TDB_ERR_LOCK;
2024                 tdb_transaction_cancel(tdb);
2025                 return -1;
2026         }
2027
2028         /* get the global lock - this prevents new users attaching to the database
2029            during the commit */
2030         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2031                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2032                 tdb->ecode = TDB_ERR_LOCK;
2033                 tdb_transaction_cancel(tdb);
2034                 return -1;
2035         }
2036
2037         if (!(tdb->flags & TDB_NOSYNC)) {
2038                 /* write the recovery data to the end of the file */
2039                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2040                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2041                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2042                         tdb_transaction_cancel(tdb);
2043                         return -1;
2044                 }
2045         }
2046
2047         /* expand the file to the new size if needed */
2048         if (tdb->map_size != tdb->transaction->old_map_size) {
2049                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2050                                              tdb->map_size -
2051                                              tdb->transaction->old_map_size) == -1) {
2052                         tdb->ecode = TDB_ERR_IO;
2053                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2054                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2055                         tdb_transaction_cancel(tdb);
2056                         return -1;
2057                 }
2058                 tdb->map_size = tdb->transaction->old_map_size;
2059                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2060         }
2061
2062         /* perform all the writes */
2063         while (tdb->transaction->elements) {
2064                 struct tdb_transaction_el *el = tdb->transaction->elements;
2065
2066                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2067                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2068
2069                         /* we've overwritten part of the data and
2070                            possibly expanded the file, so we need to
2071                            run the crash recovery code */
2072                         tdb->methods = methods;
2073                         tdb_transaction_recover(tdb);
2074
2075                         tdb_transaction_cancel(tdb);
2076                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2077
2078                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2079                         return -1;
2080                 }
2081                 tdb->transaction->elements = el->next;
2082                 free(el->data);
2083                 free(el);
2084         }
2085
2086         if (!(tdb->flags & TDB_NOSYNC)) {
2087                 /* ensure the new data is on disk */
2088                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2089                         return -1;
2090                 }
2091
2092                 /* remove the recovery marker */
2093                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2094                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2095                         return -1;
2096                 }
2097
2098                 /* ensure the recovery marker has been removed on disk */
2099                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
2100                         return -1;
2101                 }
2102         }
2103
2104         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2105
2106         /*
2107           TODO: maybe write to some dummy hdr field, or write to magic
2108           offset without mmap, before the last sync, instead of the
2109           utime() call
2110         */
2111
2112         /* on some systems (like Linux 2.6.x) changes via mmap/msync
2113            don't change the mtime of the file, this means the file may
2114            not be backed up (as tdb rounding to block sizes means that
2115            file size changes are quite rare too). The following forces
2116            mtime changes when a transaction completes */
2117 #ifdef HAVE_UTIME
2118         utime(tdb->name, NULL);
2119 #endif
2120
2121         /* use a transaction cancel to free memory and remove the
2122            transaction locks */
2123         tdb_transaction_cancel(tdb);
2124         return 0;
2125 }
2126
2127
2128 /*
2129   recover from an aborted transaction. Must be called with exclusive
2130   database write access already established (including the global
2131   lock to prevent new processes attaching)
2132 */
2133 int tdb_transaction_recover(struct tdb_context *tdb)
2134 {
2135         tdb_off_t recovery_head, recovery_eof;
2136         unsigned char *data, *p;
2137         u32 zero = 0;
2138         struct list_struct rec;
2139
2140         /* find the recovery area */
2141         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2142                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2143                 tdb->ecode = TDB_ERR_IO;
2144                 return -1;
2145         }
2146
2147         if (recovery_head == 0) {
2148                 /* we have never allocated a recovery record */
2149                 return 0;
2150         }
2151
2152         /* read the recovery record */
2153         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2154                                    sizeof(rec), DOCONV()) == -1) {
2155                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2156                 tdb->ecode = TDB_ERR_IO;
2157                 return -1;
2158         }
2159
2160         if (rec.magic != TDB_RECOVERY_MAGIC) {
2161                 /* there is no valid recovery data */
2162                 return 0;
2163         }
2164
2165         if (tdb->read_only) {
2166                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2167                 tdb->ecode = TDB_ERR_CORRUPT;
2168                 return -1;
2169         }
2170
2171         recovery_eof = rec.key_len;
2172
2173         data = (unsigned char *)malloc(rec.data_len);
2174         if (data == NULL) {
2175                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2176                 tdb->ecode = TDB_ERR_OOM;
2177                 return -1;
2178         }
2179
2180         /* read the full recovery data */
2181         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2182                                    rec.data_len, 0) == -1) {
2183                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2184                 tdb->ecode = TDB_ERR_IO;
2185                 return -1;
2186         }
2187
2188         /* recover the file data */
2189         p = data;
2190         while (p+8 < data + rec.data_len) {
2191                 u32 ofs, len;
2192                 if (DOCONV()) {
2193                         tdb_convert(p, 8);
2194                 }
2195                 memcpy(&ofs, p, 4);
2196                 memcpy(&len, p+4, 4);
2197
2198                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2199                         free(data);
2200                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2201                         tdb->ecode = TDB_ERR_IO;
2202                         return -1;
2203                 }
2204                 p += 8 + len;
2205         }
2206
2207         free(data);
2208
2209         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2210                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2211                 tdb->ecode = TDB_ERR_IO;
2212                 return -1;
2213         }
2214
2215         /* if the recovery area is after the recovered eof then remove it */
2216         if (recovery_eof <= recovery_head) {
2217                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2218                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2219                         tdb->ecode = TDB_ERR_IO;
2220                         return -1;
2221                 }
2222         }
2223
2224         /* remove the recovery magic */
2225         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2226                           &zero) == -1) {
2227                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2228                 tdb->ecode = TDB_ERR_IO;
2229                 return -1;
2230         }
2231
2232         /* reduce the file size to the old size */
2233         tdb_munmap(tdb);
2234         if (ftruncate(tdb->fd, recovery_eof) != 0) {
2235                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2236                 tdb->ecode = TDB_ERR_IO;
2237                 return -1;
2238         }
2239         tdb->map_size = recovery_eof;
2240         tdb_mmap(tdb);
2241
2242         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2243                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2244                 tdb->ecode = TDB_ERR_IO;
2245                 return -1;
2246         }
2247
2248         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2249                  recovery_eof));
2250
2251         /* all done */
2252         return 0;
2253 }
2254
2255 /* file: freelist.c */
2256
2257 /* read a freelist record and check for simple errors */
2258 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2259 {
2260         if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2261                 return -1;
2262
2263         if (rec->magic == TDB_MAGIC) {
2264                 /* this happens when a app is showdown while deleting a record - we should
2265                    not completely fail when this happens */
2266                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2267                          rec->magic, off));
2268                 rec->magic = TDB_FREE_MAGIC;
2269                 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2270                         return -1;
2271         }
2272
2273         if (rec->magic != TDB_FREE_MAGIC) {
2274                 /* Ensure ecode is set for log fn. */
2275                 tdb->ecode = TDB_ERR_CORRUPT;
2276                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2277                            rec->magic, off));
2278                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2279         }
2280         if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2281                 return -1;
2282         return 0;
2283 }
2284
2285
2286
2287 /* Remove an element from the freelist.  Must have alloc lock. */
2288 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2289 {
2290         tdb_off_t last_ptr, i;
2291
2292         /* read in the freelist top */
2293         last_ptr = FREELIST_TOP;
2294         while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2295                 if (i == off) {
2296                         /* We've found it! */
2297                         return tdb_ofs_write(tdb, last_ptr, &next);
2298                 }
2299                 /* Follow chain (next offset is at start of record) */
2300                 last_ptr = i;
2301         }
2302         TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2303         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2304 }
2305
2306
2307 /* update a record tailer (must hold allocation lock) */
2308 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2309                          const struct list_struct *rec)
2310 {
2311         tdb_off_t totalsize;
2312
2313         /* Offset of tailer from record header */
2314         totalsize = sizeof(*rec) + rec->rec_len;
2315         return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2316                          &totalsize);
2317 }
2318
2319 /* Add an element into the freelist. Merge adjacent records if
2320    neccessary. */
2321 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2322 {
2323         tdb_off_t right, left;
2324
2325         /* Allocation and tailer lock */
2326         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2327                 return -1;
2328
2329         /* set an initial tailer, so if we fail we don't leave a bogus record */
2330         if (update_tailer(tdb, offset, rec) != 0) {
2331                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2332                 goto fail;
2333         }
2334
2335         /* Look right first (I'm an Australian, dammit) */
2336         right = offset + sizeof(*rec) + rec->rec_len;
2337         if (right + sizeof(*rec) <= tdb->map_size) {
2338                 struct list_struct r;
2339
2340                 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2341                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2342                         goto left;
2343                 }
2344
2345                 /* If it's free, expand to include it. */
2346                 if (r.magic == TDB_FREE_MAGIC) {
2347                         if (remove_from_freelist(tdb, right, r.next) == -1) {
2348                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2349                                 goto left;
2350                         }
2351                         rec->rec_len += sizeof(r) + r.rec_len;
2352                 }
2353         }
2354
2355 left:
2356         /* Look left */
2357         left = offset - sizeof(tdb_off_t);
2358         if (left > TDB_DATA_START(tdb->header.hash_size)) {
2359                 struct list_struct l;
2360                 tdb_off_t leftsize;
2361
2362                 /* Read in tailer and jump back to header */
2363                 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2364                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2365                         goto update;
2366                 }
2367
2368                 /* it could be uninitialised data */
2369                 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2370                         goto update;
2371                 }
2372
2373                 left = offset - leftsize;
2374
2375                 /* Now read in record */
2376                 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2377                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2378                         goto update;
2379                 }
2380
2381                 /* If it's free, expand to include it. */
2382                 if (l.magic == TDB_FREE_MAGIC) {
2383                         if (remove_from_freelist(tdb, left, l.next) == -1) {
2384                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2385                                 goto update;
2386                         } else {
2387                                 offset = left;
2388                                 rec->rec_len += leftsize;
2389                         }
2390                 }
2391         }
2392
2393 update:
2394         if (update_tailer(tdb, offset, rec) == -1) {
2395                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2396                 goto fail;
2397         }
2398
2399         /* Now, prepend to free list */
2400         rec->magic = TDB_FREE_MAGIC;
2401
2402         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2403             tdb_rec_write(tdb, offset, rec) == -1 ||
2404             tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2405                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2406                 goto fail;
2407         }
2408
2409         /* And we're done. */
2410         tdb_unlock(tdb, -1, F_WRLCK);
2411         return 0;
2412
2413  fail:
2414         tdb_unlock(tdb, -1, F_WRLCK);
2415         return -1;
2416 }
2417
2418
2419 /*
2420    the core of tdb_allocate - called when we have decided which
2421    free list entry to use
2422  */
2423 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2424                                 struct list_struct *rec, tdb_off_t last_ptr)
2425 {
2426         struct list_struct newrec;
2427         tdb_off_t newrec_ptr;
2428
2429         memset(&newrec, '\0', sizeof(newrec));
2430
2431         /* found it - now possibly split it up  */
2432         if (rec->rec_len > length + MIN_REC_SIZE) {
2433                 /* Length of left piece */
2434                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2435
2436                 /* Right piece to go on free list */
2437                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2438                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2439
2440                 /* And left record is shortened */
2441                 rec->rec_len = length;
2442         } else {
2443                 newrec_ptr = 0;
2444         }
2445
2446         /* Remove allocated record from the free list */
2447         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2448                 return 0;
2449         }
2450
2451         /* Update header: do this before we drop alloc
2452            lock, otherwise tdb_free() might try to
2453            merge with us, thinking we're free.
2454            (Thanks Jeremy Allison). */
2455         rec->magic = TDB_MAGIC;
2456         if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2457                 return 0;
2458         }
2459
2460         /* Did we create new block? */
2461         if (newrec_ptr) {
2462                 /* Update allocated record tailer (we
2463                    shortened it). */
2464                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2465                         return 0;
2466                 }
2467
2468                 /* Free new record */
2469                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2470                         return 0;
2471                 }
2472         }
2473
2474         /* all done - return the new record offset */
2475         return rec_ptr;
2476 }
2477
2478 /* allocate some space from the free list. The offset returned points
2479    to a unconnected list_struct within the database with room for at
2480    least length bytes of total data
2481
2482    0 is returned if the space could not be allocated
2483  */
2484 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2485 {
2486         tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2487         struct {
2488                 tdb_off_t rec_ptr, last_ptr;
2489                 tdb_len_t rec_len;
2490         } bestfit;
2491
2492         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2493                 return 0;
2494
2495         /* Extra bytes required for tailer */
2496         length += sizeof(tdb_off_t);
2497
2498  again:
2499         last_ptr = FREELIST_TOP;
2500
2501         /* read in the freelist top */
2502         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2503                 goto fail;
2504
2505         bestfit.rec_ptr = 0;
2506         bestfit.last_ptr = 0;
2507         bestfit.rec_len = 0;
2508
2509         /*
2510            this is a best fit allocation strategy. Originally we used
2511            a first fit strategy, but it suffered from massive fragmentation
2512            issues when faced with a slowly increasing record size.
2513          */
2514         while (rec_ptr) {
2515                 if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2516                         goto fail;
2517                 }
2518
2519                 if (rec->rec_len >= length) {
2520                         if (bestfit.rec_ptr == 0 ||
2521                             rec->rec_len < bestfit.rec_len) {
2522                                 bestfit.rec_len = rec->rec_len;
2523                                 bestfit.rec_ptr = rec_ptr;
2524                                 bestfit.last_ptr = last_ptr;
2525                                 /* consider a fit to be good enough if
2526                                    we aren't wasting more than half
2527                                    the space */
2528                                 if (bestfit.rec_len < 2*length) {
2529                                         break;
2530                                 }
2531                         }
2532                 }
2533
2534                 /* move to the next record */
2535                 last_ptr = rec_ptr;
2536                 rec_ptr = rec->next;
2537         }
2538
2539         if (bestfit.rec_ptr != 0) {
2540                 if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2541                         goto fail;
2542                 }
2543
2544                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2545                 tdb_unlock(tdb, -1, F_WRLCK);
2546                 return newrec_ptr;
2547         }
2548
2549         /* we didn't find enough space. See if we can expand the
2550            database and if we can then try again */
2551         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2552                 goto again;
2553  fail:
2554         tdb_unlock(tdb, -1, F_WRLCK);
2555         return 0;
2556 }
2557
2558 /* file: freelistcheck.c */
2559
2560 /* Check the freelist is good and contains no loops.
2561    Very memory intensive - only do this as a consistency
2562    checker. Heh heh - uses an in memory tdb as the storage
2563    for the "seen" record list. For some reason this strikes
2564    me as extremely clever as I don't have to write another tree
2565    data structure implementation :-).
2566  */
2567
2568 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2569 {
2570         TDB_DATA key, data;
2571
2572         memset(&data, '\0', sizeof(data));
2573         key.dptr = (unsigned char *)&rec_ptr;
2574         key.dsize = sizeof(rec_ptr);
2575         return tdb_store(mem_tdb, key, data, TDB_INSERT);
2576 }
2577
2578 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2579 {
2580         struct tdb_context *mem_tdb = NULL;
2581         struct list_struct rec;
2582         tdb_off_t rec_ptr, last_ptr;
2583         int ret = -1;
2584
2585         *pnum_entries = 0;
2586
2587         mem_tdb = tdb_open("flval", tdb->header.hash_size,
2588                                 TDB_INTERNAL, O_RDWR, 0600);
2589         if (!mem_tdb) {
2590                 return -1;
2591         }
2592
2593         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2594                 tdb_close(mem_tdb);
2595                 return 0;
2596         }
2597
2598         last_ptr = FREELIST_TOP;
2599
2600         /* Store the FREELIST_TOP record. */
2601         if (seen_insert(mem_tdb, last_ptr) == -1) {
2602                 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2603                 goto fail;
2604         }
2605
2606         /* read in the freelist top */
2607         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2608                 goto fail;
2609         }
2610
2611         while (rec_ptr) {
2612
2613                 /* If we can't store this record (we've seen it
2614                    before) then the free list has a loop and must
2615                    be corrupt. */
2616
2617                 if (seen_insert(mem_tdb, rec_ptr)) {
2618                         ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2619                         goto fail;
2620                 }
2621
2622                 if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2623                         goto fail;
2624                 }
2625
2626                 /* move to the next record */
2627                 last_ptr = rec_ptr;
2628                 rec_ptr = rec.next;
2629                 *pnum_entries += 1;
2630         }
2631
2632         ret = 0;
2633
2634   fail:
2635
2636         tdb_close(mem_tdb);
2637         tdb_unlock(tdb, -1, F_WRLCK);
2638         return ret;
2639 }
2640
2641 /* file: traverse.c */
2642
2643 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2644 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2645                          struct list_struct *rec)
2646 {
2647         int want_next = (tlock->off != 0);
2648
2649         /* Lock each chain from the start one. */
2650         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2651                 if (!tlock->off && tlock->hash != 0) {
2652                         /* this is an optimisation for the common case where
2653                            the hash chain is empty, which is particularly
2654                            common for the use of tdb with ldb, where large
2655                            hashes are used. In that case we spend most of our
2656                            time in tdb_brlock(), locking empty hash chains.
2657
2658                            To avoid this, we do an unlocked pre-check to see
2659                            if the hash chain is empty before starting to look
2660                            inside it. If it is empty then we can avoid that
2661                            hash chain. If it isn't empty then we can't believe
2662                            the value we get back, as we read it without a
2663                            lock, so instead we get the lock and re-fetch the
2664                            value below.
2665
2666                            Notice that not doing this optimisation on the
2667                            first hash chain is critical. We must guarantee
2668                            that we have done at least one fcntl lock at the
2669                            start of a search to guarantee that memory is
2670                            coherent on SMP systems. If records are added by
2671                            others during the search then thats OK, and we
2672                            could possibly miss those with this trick, but we
2673                            could miss them anyway without this trick, so the
2674                            semantics don't change.
2675
2676                            With a non-indexed ldb search this trick gains us a
2677                            factor of around 80 in speed on a linux 2.6.x
2678                            system (testing using ldbtest).
2679                         */
2680                         tdb->methods->next_hash_chain(tdb, &tlock->hash);
2681                         if (tlock->hash == tdb->header.hash_size) {
2682                                 continue;
2683                         }
2684                 }
2685
2686                 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2687                         return -1;
2688
2689                 /* No previous record?  Start at top of chain. */
2690                 if (!tlock->off) {
2691                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2692                                      &tlock->off) == -1)
2693                                 goto fail;
2694                 } else {
2695                         /* Otherwise unlock the previous record. */
2696                         if (tdb_unlock_record(tdb, tlock->off) != 0)
2697                                 goto fail;
2698                 }
2699
2700                 if (want_next) {
2701                         /* We have offset of old record: grab next */
2702                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2703                                 goto fail;
2704                         tlock->off = rec->next;
2705                 }
2706
2707                 /* Iterate through chain */
2708                 while( tlock->off) {
2709                         tdb_off_t current;
2710                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2711                                 goto fail;
2712
2713                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2714                         if (tlock->off == rec->next) {
2715                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2716                                 goto fail;
2717                         }
2718
2719                         if (!TDB_DEAD(rec)) {
2720                                 /* Woohoo: we found one! */
2721                                 if (tdb_lock_record(tdb, tlock->off) != 0)
2722                                         goto fail;
2723                                 return tlock->off;
2724                         }
2725
2726                         /* Try to clean dead ones from old traverses */
2727                         current = tlock->off;
2728                         tlock->off = rec->next;
2729                         if (!(tdb->read_only || tdb->traverse_read) &&
2730                             tdb_do_delete(tdb, current, rec) != 0)
2731                                 goto fail;
2732                 }
2733                 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2734                 want_next = 0;
2735         }
2736         /* We finished iteration without finding anything */
2737         return TDB_ERRCODE(TDB_SUCCESS, 0);
2738
2739  fail:
2740         tlock->off = 0;
2741         if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2742                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2743         return -1;
2744 }
2745
2746 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2747    return -1 on error or the record count traversed
2748    if fn is NULL then it is not called
2749    a non-zero return value from fn() indicates that the traversal should stop
2750   */
2751 static int tdb_traverse_internal(struct tdb_context *tdb,
2752                                  tdb_traverse_func fn, void *private_data,
2753                                  struct tdb_traverse_lock *tl)
2754 {
2755         TDB_DATA key, dbuf;
2756         struct list_struct rec;
2757         int ret, count = 0;
2758
2759         /* This was in the initializaton, above, but the IRIX compiler
2760          * did not like it.  crh
2761          */
2762         tl->next = tdb->travlocks.next;
2763
2764         /* fcntl locks don't stack: beware traverse inside traverse */
2765         tdb->travlocks.next = tl;
2766
2767         /* tdb_next_lock places locks on the record returned, and its chain */
2768         while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2769                 count++;
2770                 /* now read the full record */
2771                 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2772                                           rec.key_len + rec.data_len);
2773                 if (!key.dptr) {
2774                         ret = -1;
2775                         if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2776                                 goto out;
2777                         if (tdb_unlock_record(tdb, tl->off) != 0)
2778                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2779                         goto out;
2780                 }
2781                 key.dsize = rec.key_len;
2782                 dbuf.dptr = key.dptr + rec.key_len;
2783                 dbuf.dsize = rec.data_len;
2784
2785                 /* Drop chain lock, call out */
2786                 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2787                         ret = -1;
2788                         SAFE_FREE(key.dptr);
2789                         goto out;
2790                 }
2791                 if (fn && fn(tdb, key, dbuf, private_data)) {
2792                         /* They want us to terminate traversal */
2793                         ret = count;
2794                         if (tdb_unlock_record(tdb, tl->off) != 0) {
2795                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2796                                 ret = -1;
2797                         }
2798                         SAFE_FREE(key.dptr);
2799                         goto out;
2800                 }
2801                 SAFE_FREE(key.dptr);
2802         }
2803 out:
2804         tdb->travlocks.next = tl->next;
2805         if (ret < 0)
2806                 return -1;
2807         else
2808                 return count;
2809 }
2810
2811
2812 /*
2813   a write style traverse - temporarily marks the db read only
2814 */
2815 int tdb_traverse_read(struct tdb_context *tdb,
2816                       tdb_traverse_func fn, void *private_data)
2817 {
2818         struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2819         int ret;
2820
2821         /* we need to get a read lock on the transaction lock here to
2822            cope with the lock ordering semantics of solaris10 */
2823         if (tdb_transaction_lock(tdb, F_RDLCK)) {
2824                 return -1;
2825         }
2826
2827         tdb->traverse_read++;
2828         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2829         tdb->traverse_read--;
2830
2831         tdb_transaction_unlock(tdb);
2832
2833         return ret;
2834 }
2835
2836 /*
2837   a write style traverse - needs to get the transaction lock to
2838   prevent deadlocks
2839 */
2840 int tdb_traverse(struct tdb_context *tdb,
2841                  tdb_traverse_func fn, void *private_data)
2842 {
2843         struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2844         int ret;
2845
2846         if (tdb->read_only || tdb->traverse_read) {
2847                 return tdb_traverse_read(tdb, fn, private_data);
2848         }
2849
2850         if (tdb_transaction_lock(tdb, F_WRLCK)) {
2851                 return -1;
2852         }
2853
2854         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2855
2856         tdb_transaction_unlock(tdb);
2857
2858         return ret;
2859 }
2860
2861
2862 /* find the first entry in the database and return its key */
2863 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2864 {
2865         TDB_DATA key;
2866         struct list_struct rec;
2867
2868         /* release any old lock */
2869         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2870                 return tdb_null;
2871         tdb->travlocks.off = tdb->travlocks.hash = 0;
2872         tdb->travlocks.lock_rw = F_RDLCK;
2873
2874         /* Grab first record: locks chain and returned record. */
2875         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2876                 return tdb_null;
2877         /* now read the key */
2878         key.dsize = rec.key_len;
2879         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2880
2881         /* Unlock the hash chain of the record we just read. */
2882         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2883                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2884         return key;
2885 }
2886
2887 /* find the next entry in the database, returning its key */
2888 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2889 {
2890         u32 oldhash;
2891         TDB_DATA key = tdb_null;
2892         struct list_struct rec;
2893         unsigned char *k = NULL;
2894
2895         /* Is locked key the old key?  If so, traverse will be reliable. */
2896         if (tdb->travlocks.off) {
2897                 if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2898                         return tdb_null;
2899                 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2900                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2901                                             rec.key_len))
2902                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2903                         /* No, it wasn't: unlock it and start from scratch */
2904                         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2905                                 SAFE_FREE(k);
2906                                 return tdb_null;
2907                         }
2908                         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2909                                 SAFE_FREE(k);
2910                                 return tdb_null;
2911                         }
2912                         tdb->travlocks.off = 0;
2913                 }
2914
2915                 SAFE_FREE(k);
2916         }
2917
2918         if (!tdb->travlocks.off) {
2919                 /* No previous element: do normal find, and lock record */
2920                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2921                 if (!tdb->travlocks.off)
2922                         return tdb_null;
2923                 tdb->travlocks.hash = BUCKET(rec.full_hash);
2924                 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2925                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2926                         return tdb_null;
2927                 }
2928         }
2929         oldhash = tdb->travlocks.hash;
2930
2931         /* Grab next record: locks chain and returned record,
2932            unlocks old record */
2933         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2934                 key.dsize = rec.key_len;
2935                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2936                                           key.dsize);
2937                 /* Unlock the chain of this new record */
2938                 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2939                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2940         }
2941         /* Unlock the chain of old record */
2942         if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2943                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2944         return key;
2945 }
2946
2947 /* file: dump.c */
2948
2949 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2950                                  tdb_off_t offset)
2951 {
2952         struct list_struct rec;
2953         tdb_off_t tailer_ofs, tailer;
2954
2955         if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2956                                    sizeof(rec), DOCONV()) == -1) {
2957                 printf("ERROR: failed to read record at %u\n", offset);
2958                 return 0;
2959         }
2960
2961         printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2962                "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2963                hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2964                rec.full_hash, rec.magic);
2965
2966         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2967
2968         if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2969                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2970                 return rec.next;
2971         }
2972
2973         if (tailer != rec.rec_len + sizeof(rec)) {
2974                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2975                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2976         }
2977         return rec.next;
2978 }
2979
2980 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2981 {
2982         tdb_off_t rec_ptr, top;
2983
2984         top = TDB_HASH_TOP(i);
2985
2986         if (tdb_lock(tdb, i, F_WRLCK) != 0)
2987                 return -1;
2988
2989         if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2990                 return tdb_unlock(tdb, i, F_WRLCK);
2991
2992         if (rec_ptr)
2993                 printf("hash=%d\n", i);
2994
2995         while (rec_ptr) {
2996                 rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
2997         }
2998
2999         return tdb_unlock(tdb, i, F_WRLCK);
3000 }
3001
3002 void tdb_dump_all(struct tdb_context *tdb)
3003 {
3004         int i;
3005         for (i=0;i<tdb->header.hash_size;i++) {
3006                 tdb_dump_chain(tdb, i);
3007         }
3008         printf("freelist:\n");
3009         tdb_dump_chain(tdb, -1);
3010 }
3011
3012 int tdb_printfreelist(struct tdb_context *tdb)
3013 {
3014         int ret;
3015         long total_free = 0;
3016         tdb_off_t offset, rec_ptr;
3017         struct list_struct rec;
3018
3019         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3020                 return ret;
3021
3022         offset = FREELIST_TOP;
3023
3024         /* read in the freelist top */
3025         if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3026                 tdb_unlock(tdb, -1, F_WRLCK);
3027                 return 0;
3028         }
3029
3030         printf("freelist top=[0x%08x]\n", rec_ptr );
3031         while (rec_ptr) {
3032                 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3033                                            sizeof(rec), DOCONV()) == -1) {
3034                         tdb_unlock(tdb, -1, F_WRLCK);
3035                         return -1;
3036                 }
3037
3038                 if (rec.magic != TDB_FREE_MAGIC) {
3039                         printf("bad magic 0x%08x in free list\n", rec.magic);
3040                         tdb_unlock(tdb, -1, F_WRLCK);
3041                         return -1;
3042                 }
3043
3044                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3045                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3046                 total_free += rec.rec_len;
3047
3048                 /* move to the next record */
3049                 rec_ptr = rec.next;
3050         }
3051         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3052                (int)total_free);
3053
3054         return tdb_unlock(tdb, -1, F_WRLCK);
3055 }
3056
3057 /* file: tdb.c */
3058
3059 /*
3060   non-blocking increment of the tdb sequence number if the tdb has been opened using
3061   the TDB_SEQNUM flag
3062 */
3063 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3064 {
3065         tdb_off_t seqnum=0;
3066
3067         if (!(tdb->flags & TDB_SEQNUM)) {
3068                 return;
3069         }
3070
3071         /* we ignore errors from this, as we have no sane way of
3072            dealing with them.
3073         */
3074         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3075         seqnum++;
3076         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3077 }
3078
3079 /*
3080   increment the tdb sequence number if the tdb has been opened using
3081   the TDB_SEQNUM flag
3082 */
3083 static void tdb_increment_seqnum(struct tdb_context *tdb)
3084 {
3085         if (!(tdb->flags & TDB_SEQNUM)) {
3086                 return;
3087         }
3088
3089         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3090                 return;
3091         }
3092
3093         tdb_increment_seqnum_nonblock(tdb);
3094
3095         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3096 }
3097
3098 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3099 {
3100         return memcmp(data.dptr, key.dptr, data.dsize);
3101 }
3102
3103 /* Returns 0 on fail.  On success, return offset of record, and fills
3104    in rec */
3105 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3106                         struct list_struct *r)
3107 {
3108         tdb_off_t rec_ptr;
3109
3110         /* read in the hash top */
3111         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3112                 return 0;
3113
3114         /* keep looking until we find the right record */
3115         while (rec_ptr) {
3116                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3117                         return 0;
3118
3119                 if (!TDB_DEAD(r) && hash==r->full_hash
3120                     && key.dsize==r->key_len
3121                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3122                                       r->key_len, tdb_key_compare,
3123                                       NULL) == 0) {
3124                         return rec_ptr;
3125                 }
3126                 rec_ptr = r->next;
3127         }
3128         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3129 }
3130
3131 /* As tdb_find, but if you succeed, keep the lock */
3132 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3133                            struct list_struct *rec)
3134 {
3135         u32 rec_ptr;
3136
3137         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3138                 return 0;
3139         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3140                 tdb_unlock(tdb, BUCKET(hash), locktype);
3141         return rec_ptr;
3142 }
3143
3144
3145 /* update an entry in place - this only works if the new data size
3146    is <= the old data size and the key exists.
3147    on failure return -1.
3148 */
3149 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3150 {
3151         struct list_struct rec;
3152         tdb_off_t rec_ptr;
3153
3154         /* find entry */
3155         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3156                 return -1;
3157
3158         /* must be long enough key, data and tailer */
3159         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3160                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3161                 return -1;
3162         }
3163
3164         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3165                       dbuf.dptr, dbuf.dsize) == -1)
3166                 return -1;
3167
3168         if (dbuf.dsize != rec.data_len) {
3169                 /* update size */
3170                 rec.data_len = dbuf.dsize;
3171                 return tdb_rec_write(tdb, rec_ptr, &rec);
3172         }
3173
3174         return 0;
3175 }
3176
3177 /* find an entry in the database given a key */
3178 /* If an entry doesn't exist tdb_err will be set to
3179  * TDB_ERR_NOEXIST. If a key has no data attached
3180  * then the TDB_DATA will have zero length but
3181  * a non-zero pointer
3182  */
3183 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3184 {
3185         tdb_off_t rec_ptr;
3186         struct list_struct rec;
3187         TDB_DATA ret;
3188         u32 hash;
3189
3190         /* find which hash bucket it is in */
3191         hash = tdb->hash_fn(&key);
3192         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3193                 return tdb_null;
3194
3195         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3196                                   rec.data_len);
3197         ret.dsize = rec.data_len;
3198         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3199         return ret;
3200 }
3201
3202 /*
3203  * Find an entry in the database and hand the record's data to a parsing
3204  * function. The parsing function is executed under the chain read lock, so it
3205  * should be fast and should not block on other syscalls.
3206  *
3207  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3208  *
3209  * For mmapped tdb's that do not have a transaction open it points the parsing
3210  * function directly at the mmap area, it avoids the malloc/memcpy in this
3211  * case. If a transaction is open or no mmap is available, it has to do
3212  * malloc/read/parse/free.
3213  *
3214  * This is interesting for all readers of potentially large data structures in
3215  * the tdb records, ldb indexes being one example.
3216  */
3217
3218 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3219                      int (*parser)(TDB_DATA key, TDB_DATA data,
3220                                    void *private_data),
3221                      void *private_data)
3222 {
3223         tdb_off_t rec_ptr;
3224         struct list_struct rec;
3225         int ret;
3226         u32 hash;
3227
3228         /* find which hash bucket it is in */
3229         hash = tdb->hash_fn(&key);
3230
3231         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3232                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3233         }
3234
3235         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3236                              rec.data_len, parser, private_data);
3237
3238         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3239
3240         return ret;
3241 }
3242
3243 /* check if an entry in the database exists
3244
3245    note that 1 is returned if the key is found and 0 is returned if not found
3246    this doesn't match the conventions in the rest of this module, but is
3247    compatible with gdbm
3248 */
3249 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3250 {
3251         struct list_struct rec;
3252
3253         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3254                 return 0;
3255         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3256         return 1;
3257 }
3258
3259 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3260 {
3261         u32 hash = tdb->hash_fn(&key);
3262         return tdb_exists_hash(tdb, key, hash);
3263 }
3264
3265 /* actually delete an entry in the database given the offset */
3266 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3267 {
3268         tdb_off_t last_ptr, i;
3269         struct list_struct lastrec;
3270
3271         if (tdb->read_only || tdb->traverse_read) return -1;
3272
3273         if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3274                 /* Someone traversing here: mark it as dead */
3275                 rec->magic = TDB_DEAD_MAGIC;
3276                 return tdb_rec_write(tdb, rec_ptr, rec);
3277         }
3278         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3279                 return -1;
3280
3281         /* find previous record in hash chain */
3282         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3283                 return -1;
3284         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3285                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3286                         return -1;
3287
3288         /* unlink it: next ptr is at start of record. */
3289         if (last_ptr == 0)
3290                 last_ptr = TDB_HASH_TOP(rec->full_hash);
3291         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3292                 return -1;
3293
3294         /* recover the space */
3295         if (tdb_free(tdb, rec_ptr, rec) == -1)
3296                 return -1;
3297         return 0;
3298 }
3299
3300 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3301 {
3302         int res = 0;
3303         tdb_off_t rec_ptr;
3304         struct list_struct rec;
3305
3306         /* read in the hash top */
3307         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3308                 return 0;
3309
3310         while (rec_ptr) {
3311                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3312                         return 0;
3313
3314                 if (rec.magic == TDB_DEAD_MAGIC) {
3315                         res += 1;
3316                 }
3317                 rec_ptr = rec.next;
3318         }
3319         return res;
3320 }
3321
3322 /*
3323  * Purge all DEAD records from a hash chain
3324  */
3325 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3326 {
3327         int res = -1;
3328         struct list_struct rec;
3329         tdb_off_t rec_ptr;
3330
3331         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3332                 return -1;
3333         }
3334
3335         /* read in the hash top */
3336         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3337                 goto fail;
3338
3339         while (rec_ptr) {
3340                 tdb_off_t next;
3341
3342                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3343                         goto fail;
3344                 }
3345
3346                 next = rec.next;
3347
3348                 if (rec.magic == TDB_DEAD_MAGIC
3349                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3350                         goto fail;
3351                 }
3352                 rec_ptr = next;
3353         }
3354         res = 0;
3355  fail:
3356         tdb_unlock(tdb, -1, F_WRLCK);
3357         return res;
3358 }
3359
3360 /* delete an entry in the database given a key */
3361 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3362 {
3363         tdb_off_t rec_ptr;
3364         struct list_struct rec;
3365         int ret;
3366
3367         if (tdb->max_dead_records != 0) {
3368
3369                 /*
3370                  * Allow for some dead records per hash chain, mainly for
3371                  * tdb's with a very high create/delete rate like locking.tdb.
3372                  */
3373
3374                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3375                         return -1;
3376
3377                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3378                         /*
3379                          * Don't let the per-chain freelist grow too large,
3380                          * delete all existing dead records
3381                          */
3382                         tdb_purge_dead(tdb, hash);
3383                 }
3384
3385                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3386                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3387                         return -1;
3388                 }
3389
3390                 /*
3391                  * Just mark the record as dead.
3392                  */
3393                 rec.magic = TDB_DEAD_MAGIC;
3394                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3395         }
3396         else {
3397                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3398                                                    &rec)))
3399                         return -1;
3400
3401                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3402         }
3403
3404         if (ret == 0) {
3405                 tdb_increment_seqnum(tdb);
3406         }
3407
3408         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3409                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3410         return ret;
3411 }
3412
3413 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3414 {
3415         u32 hash = tdb->hash_fn(&key);
3416         return tdb_delete_hash(tdb, key, hash);
3417 }
3418
3419 /*
3420  * See if we have a dead record around with enough space
3421  */
3422 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3423                                struct list_struct *r, tdb_len_t length)
3424 {
3425         tdb_off_t rec_ptr;
3426
3427         /* read in the hash top */
3428         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3429                 return 0;
3430
3431         /* keep looking until we find the right record */
3432         while (rec_ptr) {
3433                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3434                         return 0;
3435
3436                 if (TDB_DEAD(r) && r->rec_len >= length) {
3437                         /*
3438                          * First fit for simple coding, TODO: change to best
3439                          * fit
3440                          */
3441                         return rec_ptr;
3442                 }
3443                 rec_ptr = r->next;
3444         }
3445         return 0;
3446 }
3447
3448 /* store an element in the database, replacing any existing element
3449    with the same key
3450
3451    return 0 on success, -1 on failure
3452 */
3453 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3454 {
3455         struct list_struct rec;
3456         u32 hash;
3457         tdb_off_t rec_ptr;
3458         char *p = NULL;
3459         int ret = -1;
3460
3461         if (tdb->read_only || tdb->traverse_read) {
3462                 tdb->ecode = TDB_ERR_RDONLY;
3463                 return -1;
3464         }
3465
3466         /* find which hash bucket it is in */
3467         hash = tdb->hash_fn(&key);
3468         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3469                 return -1;
3470
3471         /* check for it existing, on insert. */
3472         if (flag == TDB_INSERT) {
3473                 if (tdb_exists_hash(tdb, key, hash)) {
3474                         tdb->ecode = TDB_ERR_EXISTS;
3475                         goto fail;
3476                 }
3477         } else {
3478                 /* first try in-place update, on modify or replace. */
3479                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3480                         goto done;
3481                 }
3482                 if (tdb->ecode == TDB_ERR_NOEXIST &&
3483                     flag == TDB_MODIFY) {
3484                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
3485                          we should fail the store */
3486                         goto fail;
3487                 }
3488         }
3489         /* reset the error code potentially set by the tdb_update() */
3490         tdb->ecode = TDB_SUCCESS;
3491
3492         /* delete any existing record - if it doesn't exist we don't
3493            care.  Doing this first reduces fragmentation, and avoids
3494            coalescing with `allocated' block before it's updated. */
3495         if (flag != TDB_INSERT)
3496                 tdb_delete_hash(tdb, key, hash);
3497
3498         /* Copy key+value *before* allocating free space in case malloc
3499            fails and we are left with a dead spot in the tdb. */
3500
3501         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3502                 tdb->ecode = TDB_ERR_OOM;
3503                 goto fail;
3504         }
3505
3506         memcpy(p, key.dptr, key.dsize);
3507         if (dbuf.dsize)
3508                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3509
3510         if (tdb->max_dead_records != 0) {
3511                 /*
3512                  * Allow for some dead records per hash chain, look if we can
3513                  * find one that can hold the new record. We need enough space
3514                  * for key, data and tailer. If we find one, we don't have to
3515                  * consult the central freelist.
3516                  */
3517                 rec_ptr = tdb_find_dead(
3518                         tdb, hash, &rec,
3519                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3520
3521                 if (rec_ptr != 0) {
3522                         rec.key_len = key.dsize;
3523                         rec.data_len = dbuf.dsize;
3524                         rec.full_hash = hash;
3525                         rec.magic = TDB_MAGIC;
3526                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3527                             || tdb->methods->tdb_write(
3528                                     tdb, rec_ptr + sizeof(rec),
3529                                     p, key.dsize + dbuf.dsize) == -1) {
3530                                 goto fail;
3531                         }
3532                         goto done;
3533                 }
3534         }
3535
3536         /*
3537          * We have to allocate some space from the freelist, so this means we
3538          * have to lock it. Use the chance to purge all the DEAD records from
3539          * the hash chain under the freelist lock.
3540          */
3541
3542         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3543                 goto fail;
3544         }
3545
3546         if ((tdb->max_dead_records != 0)
3547             && (tdb_purge_dead(tdb, hash) == -1)) {
3548                 tdb_unlock(tdb, -1, F_WRLCK);
3549                 goto fail;
3550         }
3551
3552         /* we have to allocate some space */
3553         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3554
3555         tdb_unlock(tdb, -1, F_WRLCK);
3556
3557         if (rec_ptr == 0) {
3558                 goto fail;
3559         }
3560
3561         /* Read hash top into next ptr */
3562         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3563                 goto fail;
3564
3565         rec.key_len = key.dsize;
3566         rec.data_len = dbuf.dsize;
3567         rec.full_hash = hash;
3568         rec.magic = TDB_MAGIC;
3569
3570         /* write out and point the top of the hash chain at it */
3571         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3572             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3573             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3574                 /* Need to tdb_unallocate() here */
3575                 goto fail;
3576         }
3577
3578  done:
3579         ret = 0;
3580  fail:
3581         if (ret == 0) {
3582                 tdb_increment_seqnum(tdb);
3583         }
3584
3585         SAFE_FREE(p);
3586         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3587         return ret;
3588 }
3589
3590
3591 /* Append to an entry. Create if not exist. */
3592 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3593 {
3594         u32 hash;
3595         TDB_DATA dbuf;
3596         int ret = -1;
3597
3598         /* find which hash bucket it is in */
3599         hash = tdb->hash_fn(&key);
3600         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3601                 return -1;
3602
3603         dbuf = tdb_fetch(tdb, key);
3604
3605         if (dbuf.dptr == NULL) {
3606                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3607         } else {
3608                 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3609                                                      dbuf.dsize + new_dbuf.dsize);
3610                 if (new_dptr == NULL) {
3611                         free(dbuf.dptr);
3612                 }
3613                 dbuf.dptr = new_dptr;
3614         }
3615
3616         if (dbuf.dptr == NULL) {
3617                 tdb->ecode = TDB_ERR_OOM;
3618                 goto failed;
3619         }
3620
3621         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3622         dbuf.dsize += new_dbuf.dsize;
3623
3624         ret = tdb_store(tdb, key, dbuf, 0);
3625
3626 failed:
3627         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3628         SAFE_FREE(dbuf.dptr);
3629         return ret;
3630 }
3631
3632
3633 /*
3634   return the name of the current tdb file
3635   useful for external logging functions
3636 */
3637 const char *tdb_name(struct tdb_context *tdb)
3638 {
3639         return tdb->name;
3640 }
3641
3642 /*
3643   return the underlying file descriptor being used by tdb, or -1
3644   useful for external routines that want to check the device/inode
3645   of the fd
3646 */
3647 int tdb_fd(struct tdb_context *tdb)
3648 {
3649         return tdb->fd;
3650 }
3651
3652 /*
3653   return the current logging function
3654   useful for external tdb routines that wish to log tdb errors
3655 */
3656 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3657 {
3658         return tdb->log.log_fn;
3659 }
3660
3661
3662 /*
3663   get the tdb sequence number. Only makes sense if the writers opened
3664   with TDB_SEQNUM set. Note that this sequence number will wrap quite
3665   quickly, so it should only be used for a 'has something changed'
3666   test, not for code that relies on the count of the number of changes
3667   made. If you want a counter then use a tdb record.
3668
3669   The aim of this sequence number is to allow for a very lightweight
3670   test of a possible tdb change.
3671 */
3672 int tdb_get_seqnum(struct tdb_context *tdb)
3673 {
3674         tdb_off_t seqnum=0;
3675
3676         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3677         return seqnum;
3678 }
3679
3680 int tdb_hash_size(struct tdb_context *tdb)
3681 {
3682         return tdb->header.hash_size;
3683 }
3684
3685 size_t tdb_map_size(struct tdb_context *tdb)
3686 {
3687         return tdb->map_size;
3688 }
3689
3690 int tdb_get_flags(struct tdb_context *tdb)
3691 {
3692         return tdb->flags;
3693 }
3694
3695
3696 /*
3697   enable sequence number handling on an open tdb
3698 */
3699 void tdb_enable_seqnum(struct tdb_context *tdb)
3700 {
3701         tdb->flags |= TDB_SEQNUM;
3702 }
3703
3704 /* file: open.c */
3705
3706 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3707 static struct tdb_context *tdbs = NULL;
3708
3709
3710 /* This is from a hash algorithm suggested by Rogier Wolff */
3711 static unsigned int default_tdb_hash(TDB_DATA *key)
3712 {
3713         u32 value;      /* Used to compute the hash value.  */
3714         u32   i;        /* Used to cycle through random values. */
3715
3716         /* Set the initial value from the key size. */
3717         for (value = 0, i=0; i < key->dsize; i++)
3718                 value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3719
3720         return value;
3721 }
3722
3723
3724 /* initialise a new database with a specified hash size */
3725 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3726 {
3727         struct tdb_header *newdb;
3728         int size, ret = -1;
3729
3730         /* We make it up in memory, then write it out if not internal */
3731         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3732         if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3733                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3734
3735         /* Fill in the header */
3736         newdb->version = TDB_VERSION;
3737         newdb->hash_size = hash_size;
3738         if (tdb->flags & TDB_INTERNAL) {
3739                 tdb->map_size = size;
3740                 tdb->map_ptr = (char *)newdb;
3741                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3742                 /* Convert the `ondisk' version if asked. */
3743                 CONVERT(*newdb);
3744                 return 0;
3745         }
3746         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3747                 goto fail;
3748
3749         if (ftruncate(tdb->fd, 0) == -1)
3750                 goto fail;
3751
3752         /* This creates an endian-converted header, as if read from disk */
3753         CONVERT(*newdb);
3754         memcpy(&tdb->header, newdb, sizeof(tdb->header));
3755         /* Don't endian-convert the magic food! */
3756         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3757         if (write(tdb->fd, newdb, size) != size) {
3758                 ret = -1;
3759         } else {
3760                 ret = 0;
3761         }
3762
3763   fail:
3764         SAFE_FREE(newdb);
3765         return ret;
3766 }
3767
3768
3769
3770 static int tdb_already_open(dev_t device,
3771                             ino_t ino)
3772 {
3773         struct tdb_context *i;
3774
3775         for (i = tdbs; i; i = i->next) {
3776                 if (i->device == device && i->inode == ino) {
3777                         return 1;
3778                 }
3779         }
3780
3781         return 0;
3782 }
3783
3784 /* open the database, creating it if necessary
3785
3786    The open_flags and mode are passed straight to the open call on the
3787    database file. A flags value of O_WRONLY is invalid. The hash size
3788    is advisory, use zero for a default value.
3789
3790    Return is NULL on error, in which case errno is also set.  Don't
3791    try to call tdb_error or tdb_errname, just do strerror(errno).
3792
3793    @param name may be NULL for internal databases. */
3794 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3795                       int open_flags, mode_t mode)
3796 {
3797         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3798 }
3799
3800 /* a default logging function */
3801 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3802 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3803 {
3804 }
3805
3806
3807 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3808                                 int open_flags, mode_t mode,
3809                                 const struct tdb_logging_context *log_ctx,
3810                                 tdb_hash_func hash_fn)
3811 {
3812         struct tdb_context *tdb;
3813         struct stat st;
3814         int rev = 0, locked = 0;
3815         unsigned char *vp;
3816         u32 vertest;
3817
3818         if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3819                 /* Can't log this */
3820                 errno = ENOMEM;
3821                 goto fail;
3822         }
3823         tdb_io_init(tdb);
3824         tdb->fd = -1;
3825         tdb->name = NULL;
3826         tdb->map_ptr = NULL;
3827         tdb->flags = tdb_flags;
3828         tdb->open_flags = open_flags;
3829         if (log_ctx) {
3830                 tdb->log = *log_ctx;
3831         } else {
3832                 tdb->log.log_fn = null_log_fn;
3833                 tdb->log.log_private = NULL;
3834         }
3835         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3836
3837         /* cache the page size */
3838         tdb->page_size = sysconf(_SC_PAGESIZE);
3839         if (tdb->page_size <= 0) {
3840                 tdb->page_size = 0x2000;
3841         }
3842
3843         if ((open_flags & O_ACCMODE) == O_WRONLY) {
3844                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3845                          name));
3846                 errno = EINVAL;
3847                 goto fail;
3848         }
3849
3850         if (hash_size == 0)
3851                 hash_size = DEFAULT_HASH_SIZE;
3852         if ((open_flags & O_ACCMODE) == O_RDONLY) {
3853                 tdb->read_only = 1;
3854                 /* read only databases don't do locking or clear if first */
3855                 tdb->flags |= TDB_NOLOCK;
3856                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3857         }
3858
3859         /* internal databases don't mmap or lock, and start off cleared */
3860         if (tdb->flags & TDB_INTERNAL) {
3861                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3862                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3863                 if (tdb_new_database(tdb, hash_size) != 0) {
3864                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3865                         goto fail;
3866                 }
3867                 goto internal;
3868         }
3869
3870         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3871                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3872                          name, strerror(errno)));
3873                 goto fail;      /* errno set by open(2) */
3874         }
3875
3876         /* ensure there is only one process initialising at once */
3877         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3878                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3879                          name, strerror(errno)));
3880                 goto fail;      /* errno set by tdb_brlock */
3881         }
3882
3883         /* we need to zero database if we are the only one with it open */
3884         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3885             (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3886                 open_flags |= O_CREAT;
3887                 if (ftruncate(tdb->fd, 0) == -1) {
3888                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3889                                  "failed to truncate %s: %s\n",
3890                                  name, strerror(errno)));
3891                         goto fail; /* errno set by ftruncate */
3892                 }
3893         }
3894
3895         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3896             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3897             || (tdb->header.version != TDB_VERSION
3898                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3899                 /* its not a valid database - possibly initialise it */
3900                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3901                         errno = EIO; /* ie bad format or something */
3902                         goto fail;
3903                 }
3904                 rev = (tdb->flags & TDB_CONVERT);
3905         }
3906         vp = (unsigned char *)&tdb->header.version;
3907         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3908                   (((u32)vp[2]) << 8) | (u32)vp[3];
3909         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3910         if (!rev)
3911                 tdb->flags &= ~TDB_CONVERT;
3912         else {
3913                 tdb->flags |= TDB_CONVERT;
3914                 tdb_convert(&tdb->header, sizeof(tdb->header));
3915         }
3916         if (fstat(tdb->fd, &st) == -1)
3917                 goto fail;
3918
3919         if (tdb->header.rwlocks != 0) {
3920                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3921                 goto fail;
3922         }
3923
3924         /* Is it already in the open list?  If so, fail. */
3925         if (tdb_already_open(st.st_dev, st.st_ino)) {
3926                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3927                          "%s (%d,%d) is already open in this process\n",
3928                          name, (int)st.st_dev, (int)st.st_ino));
3929                 errno = EBUSY;
3930                 goto fail;
3931         }
3932
3933         if (!(tdb->name = (char *)strdup(name))) {
3934                 errno = ENOMEM;
3935                 goto fail;
3936         }
3937
3938         tdb->map_size = st.st_size;
3939         tdb->device = st.st_dev;
3940         tdb->inode = st.st_ino;
3941         tdb->max_dead_records = 0;
3942         tdb_mmap(tdb);
3943         if (locked) {
3944                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3945                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3946                                  "failed to take ACTIVE_LOCK on %s: %s\n",
3947                                  name, strerror(errno)));
3948                         goto fail;
3949                 }
3950
3951         }
3952
3953         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3954            we didn't get the initial exclusive lock as we need to let all other
3955            users know we're using it. */
3956
3957         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3958                 /* leave this lock in place to indicate it's in use */
3959                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3960                         goto fail;
3961         }
3962
3963         /* if needed, run recovery */
3964         if (tdb_transaction_recover(tdb) == -1) {
3965                 goto fail;
3966         }
3967
3968  internal:
3969         /* Internal (memory-only) databases skip all the code above to
3970          * do with disk files, and resume here by releasing their
3971          * global lock and hooking into the active list. */
3972         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3973                 goto fail;
3974         tdb->next = tdbs;
3975         tdbs = tdb;
3976         return tdb;
3977
3978  fail:
3979         { int save_errno = errno;
3980
3981         if (!tdb)
3982                 return NULL;
3983
3984         if (tdb->map_ptr) {
3985                 if (tdb->flags & TDB_INTERNAL)
3986                         SAFE_FREE(tdb->map_ptr);
3987                 else
3988                         tdb_munmap(tdb);
3989         }
3990         SAFE_FREE(tdb->name);
3991         if (tdb->fd != -1)
3992                 if (close(tdb->fd) != 0)
3993                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3994         SAFE_FREE(tdb);
3995         errno = save_errno;
3996         return NULL;
3997         }
3998 }
3999
4000 /*
4001  * Set the maximum number of dead records per hash chain
4002  */
4003
4004 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4005 {
4006         tdb->max_dead_records = max_dead;
4007 }
4008
4009 /**
4010  * Close a database.
4011  *
4012  * @returns -1 for error; 0 for success.
4013  **/
4014 int tdb_close(struct tdb_context *tdb)
4015 {
4016         struct tdb_context **i;
4017         int ret = 0;
4018
4019         if (tdb->transaction) {
4020                 tdb_transaction_cancel(tdb);
4021         }
4022
4023         if (tdb->map_ptr) {
4024                 if (tdb->flags & TDB_INTERNAL)
4025                         SAFE_FREE(tdb->map_ptr);
4026                 else
4027                         tdb_munmap(tdb);
4028         }
4029         SAFE_FREE(tdb->name);
4030         if (tdb->fd != -1)
4031                 ret = close(tdb->fd);
4032         SAFE_FREE(tdb->lockrecs);
4033
4034         /* Remove from contexts list */
4035         for (i = &tdbs; *i; i = &(*i)->next) {
4036                 if (*i == tdb) {
4037                         *i = tdb->next;
4038                         break;
4039                 }
4040         }
4041
4042         memset(tdb, 0, sizeof(*tdb));
4043         SAFE_FREE(tdb);
4044
4045         return ret;
4046 }
4047
4048 /* register a loging function */
4049 void tdb_set_logging_function(struct tdb_context *tdb,
4050                               const struct tdb_logging_context *log_ctx)
4051 {
4052         tdb->log = *log_ctx;
4053 }
4054
4055 void *tdb_get_logging_private(struct tdb_context *tdb)
4056 {
4057         return tdb->log.log_private;
4058 }
4059
4060 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
4061    seek pointer from our parent and to re-establish locks */
4062 int tdb_reopen(struct tdb_context *tdb)
4063 {
4064         struct stat st;
4065
4066         if (tdb->flags & TDB_INTERNAL) {
4067                 return 0; /* Nothing to do. */
4068         }
4069
4070         if (tdb->num_locks != 0 || tdb->global_lock.count) {
4071                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4072                 goto fail;
4073         }
4074
4075         if (tdb->transaction != 0) {
4076                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4077                 goto fail;
4078         }
4079
4080         if (tdb_munmap(tdb) != 0) {
4081                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4082                 goto fail;
4083         }
4084         if (close(tdb->fd) != 0)
4085                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4086         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4087         if (tdb->fd == -1) {
4088                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4089                 goto fail;
4090         }
4091         if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4092             (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4093                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4094                 goto fail;
4095         }
4096         if (fstat(tdb->fd, &st) != 0) {
4097                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4098                 goto fail;
4099         }
4100         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4101                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4102                 goto fail;
4103         }
4104         tdb_mmap(tdb);
4105
4106         return 0;
4107
4108 fail:
4109         tdb_close(tdb);
4110         return -1;
4111 }
4112
4113 /* reopen all tdb's */
4114 int tdb_reopen_all(int parent_longlived)
4115 {
4116         struct tdb_context *tdb;
4117
4118         for (tdb=tdbs; tdb; tdb = tdb->next) {
4119                 /*
4120                  * If the parent is longlived (ie. a
4121                  * parent daemon architecture), we know
4122                  * it will keep it's active lock on a
4123                  * tdb opened with CLEAR_IF_FIRST. Thus
4124                  * for child processes we don't have to
4125                  * add an active lock. This is essential
4126                  * to improve performance on systems that
4127                  * keep POSIX locks as a non-scalable data
4128                  * structure in the kernel.
4129                  */
4130                 if (parent_longlived) {
4131                         /* Ensure no clear-if-first. */
4132                         tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4133                 }
4134
4135                 if (tdb_reopen(tdb) != 0)
4136                         return -1;
4137         }
4138
4139         return 0;
4140 }