Whamcloud - gitweb
Fix miscellaneous compiler warnings using "make gcc-wall"
[tools/e2fsprogs.git] / lib / ext2fs / tdb.c
1 /*
2 URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3 Rev: 23590
4 Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5 */
6  /*
7    trivial database library - standalone version
8
9    Copyright (C) Andrew Tridgell              1999-2005
10    Copyright (C) Jeremy Allison               2000-2006
11    Copyright (C) Paul `Rusty' Russell         2000
12
13      ** NOTE! The following LGPL license applies to the tdb
14      ** library. This does NOT imply that all of Samba is released
15      ** under the LGPL
16
17    This library is free software; you can redistribute it and/or
18    modify it under the terms of the GNU Lesser General Public
19    License as published by the Free Software Foundation; either
20    version 2 of the License, or (at your option) any later version.
21
22    This library is distributed in the hope that it will be useful,
23    but WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25    Lesser General Public License for more details.
26
27    You should have received a copy of the GNU Lesser General Public
28    License along with this library; if not, write to the Free Software
29    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
30 */
31
32 #ifdef CONFIG_STAND_ALONE
33 #define HAVE_MMAP
34 #define HAVE_STRDUP
35 #define HAVE_SYS_MMAN_H
36 #define HAVE_UTIME_H
37 #define HAVE_UTIME
38 #endif
39 #ifndef __FreeBSD__
40 #define _XOPEN_SOURCE 600
41 #endif
42
43 #include "config.h"
44 #include <unistd.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <stdarg.h>
48 #include <stddef.h>
49 #include <errno.h>
50 #include <string.h>
51 #ifdef HAVE_SYS_SELECT_H
52 #include <sys/select.h>
53 #endif
54 #include <sys/time.h>
55 #include <sys/types.h>
56 #include <time.h>
57 #ifdef HAVE_UTIME_H
58 #include <utime.h>
59 #endif
60 #include <sys/stat.h>
61 #include <sys/file.h>
62 #include <fcntl.h>
63
64 #ifdef HAVE_SYS_MMAN_H
65 #include <sys/mman.h>
66 #endif
67
68 #ifdef __GNUC__
69 #define EXT2FS_ATTR(x) __attribute__(x)
70 #else
71 #define EXT2FS_ATTR(x)
72 #endif
73
74 #ifndef MAP_FILE
75 #define MAP_FILE 0
76 #endif
77
78 #ifndef MAP_FAILED
79 #define MAP_FAILED ((void *)-1)
80 #endif
81
82 #ifndef HAVE_STRDUP
83 #define strdup rep_strdup
84 static char *rep_strdup(const char *s)
85 {
86         char *ret;
87         int length;
88
89         if (!s)
90                 return NULL;
91         length = strlen(s);
92         ret = malloc(length + 1);
93         if (ret) {
94                 strncpy(ret, s, length);
95                 ret[length] = '\0';
96         }
97         return ret;
98 }
99 #endif
100
101 #ifndef PRINTF_ATTRIBUTE
102 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
103 /** Use gcc attribute to check printf fns.  a1 is the 1-based index of
104  * the parameter containing the format, and a2 the index of the first
105  * argument. Note that some gcc 2.x versions don't handle this
106  * properly **/
107 #define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
108 #else
109 #define PRINTF_ATTRIBUTE(a1, a2)
110 #endif
111 #endif
112
113 typedef int bool;
114
115 #include "tdb.h"
116
117 static TDB_DATA tdb_null;
118
119 #ifndef u32
120 #define u32 unsigned
121 #endif
122
123 typedef u32 tdb_len_t;
124 typedef u32 tdb_off_t;
125
126 #ifndef offsetof
127 #define offsetof(t,f) ((unsigned int)&((t *)0)->f)
128 #endif
129
130 #define TDB_MAGIC_FOOD "TDB file\n"
131 #define TDB_VERSION (0x26011967 + 6)
132 #define TDB_MAGIC (0x26011999U)
133 #define TDB_FREE_MAGIC (~TDB_MAGIC)
134 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
135 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
136 #define TDB_ALIGNMENT 4
137 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
138 #define DEFAULT_HASH_SIZE 131
139 #define FREELIST_TOP (sizeof(struct tdb_header))
140 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
141 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
142 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
143 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
144 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
145 #define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
146 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
147 #define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
148 #define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
149 #define TDB_PAD_BYTE 0x42
150 #define TDB_PAD_U32  0x42424242
151
152 /* NB assumes there is a local variable called "tdb" that is the
153  * current context, also takes doubly-parenthesized print-style
154  * argument. */
155 #define TDB_LOG(x) tdb->log.log_fn x
156
157 /* lock offsets */
158 #define GLOBAL_LOCK      0
159 #define ACTIVE_LOCK      4
160 #define TRANSACTION_LOCK 8
161
162 /* free memory if the pointer is valid and zero the pointer */
163 #ifndef SAFE_FREE
164 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
165 #endif
166
167 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
168
169 #define DOCONV() (tdb->flags & TDB_CONVERT)
170 #define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
171
172
173 /* the body of the database is made of one list_struct for the free space
174    plus a separate data list for each hash value */
175 struct list_struct {
176         tdb_off_t next; /* offset of the next record in the list */
177         tdb_len_t rec_len; /* total byte length of record */
178         tdb_len_t key_len; /* byte length of key */
179         tdb_len_t data_len; /* byte length of data */
180         u32 full_hash; /* the full 32 bit hash of the key */
181         u32 magic;   /* try to catch errors */
182         /* the following union is implied:
183                 union {
184                         char record[rec_len];
185                         struct {
186                                 char key[key_len];
187                                 char data[data_len];
188                         }
189                         u32 totalsize; (tailer)
190                 }
191         */
192 };
193
194
195 /* this is stored at the front of every database */
196 struct tdb_header {
197         char magic_food[32]; /* for /etc/magic */
198         u32 version; /* version of the code */
199         u32 hash_size; /* number of hash entries */
200         tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
201         tdb_off_t recovery_start; /* offset of transaction recovery region */
202         tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
203         tdb_off_t reserved[29];
204 };
205
206 struct tdb_lock_type {
207         int list;
208         u32 count;
209         u32 ltype;
210 };
211
212 struct tdb_traverse_lock {
213         struct tdb_traverse_lock *next;
214         u32 off;
215         u32 hash;
216         int lock_rw;
217 };
218
219
220 struct tdb_methods {
221         int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
222         int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
223         void (*next_hash_chain)(struct tdb_context *, u32 *);
224         int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
225         int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
226         int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
227 };
228
229 struct tdb_context {
230         char *name; /* the name of the database */
231         void *map_ptr; /* where it is currently mapped */
232         int fd; /* open file descriptor for the database */
233         tdb_len_t map_size; /* how much space has been mapped */
234         int read_only; /* opened read-only */
235         int traverse_read; /* read-only traversal */
236         struct tdb_lock_type global_lock;
237         int num_lockrecs;
238         struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
239         enum TDB_ERROR ecode; /* error code for last tdb error */
240         struct tdb_header header; /* a cached copy of the header */
241         u32 flags; /* the flags passed to tdb_open */
242         struct tdb_traverse_lock travlocks; /* current traversal locks */
243         struct tdb_context *next; /* all tdbs to avoid multiple opens */
244         dev_t device;   /* uniquely identifies this tdb */
245         ino_t inode;    /* uniquely identifies this tdb */
246         struct tdb_logging_context log;
247         unsigned int (*hash_fn)(TDB_DATA *key);
248         int open_flags; /* flags used in the open - needed by reopen */
249         unsigned int num_locks; /* number of chain locks held */
250         const struct tdb_methods *methods;
251         struct tdb_transaction *transaction;
252         int page_size;
253         int max_dead_records;
254         bool have_transaction_lock;
255         tdb_len_t real_map_size; /* how much space has been mapped */
256 };
257
258
259 /*
260   internal prototypes
261 */
262 static int tdb_munmap(struct tdb_context *tdb);
263 static void tdb_mmap(struct tdb_context *tdb);
264 static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
265 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
266 static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
267 static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
268 static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
269 static int tdb_transaction_unlock(struct tdb_context *tdb);
270 static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
271 static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
272 static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
273 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
274 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
275 static void *tdb_convert(void *buf, u32 size);
276 static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
277 static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
278 static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
279 static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
280 static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
281 static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
282 static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
283 static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
284 static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
285 static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
286 static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
287                    tdb_off_t offset, tdb_len_t len,
288                    int (*parser)(TDB_DATA key, TDB_DATA data,
289                                  void *private_data),
290                    void *private_data);
291 static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
292                            struct list_struct *rec);
293 static void tdb_io_init(struct tdb_context *tdb);
294 static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
295 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
296                       struct list_struct *rec);
297
298
299 /* file: error.c */
300
301 enum TDB_ERROR tdb_error(struct tdb_context *tdb)
302 {
303         return tdb->ecode;
304 }
305
306 static struct tdb_errname {
307         enum TDB_ERROR ecode; const char *estring;
308 } emap[] = { {TDB_SUCCESS, "Success"},
309              {TDB_ERR_CORRUPT, "Corrupt database"},
310              {TDB_ERR_IO, "IO Error"},
311              {TDB_ERR_LOCK, "Locking error"},
312              {TDB_ERR_OOM, "Out of memory"},
313              {TDB_ERR_EXISTS, "Record exists"},
314              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
315              {TDB_ERR_EINVAL, "Invalid parameter"},
316              {TDB_ERR_NOEXIST, "Record does not exist"},
317              {TDB_ERR_RDONLY, "write not permitted"} };
318
319 /* Error string for the last tdb error */
320 const char *tdb_errorstr(struct tdb_context *tdb)
321 {
322         u32 i;
323         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
324                 if (tdb->ecode == emap[i].ecode)
325                         return emap[i].estring;
326         return "Invalid error code";
327 }
328
329 /* file: lock.c */
330
331 #define TDB_MARK_LOCK 0x80000000
332
333 /* a byte range locking function - return 0 on success
334    this functions locks/unlocks 1 byte at the specified offset.
335
336    On error, errno is also set so that errors are passed back properly
337    through tdb_open().
338
339    note that a len of zero means lock to end of file
340 */
341 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
342                int rw_type, int lck_type, int probe, size_t len)
343 {
344         struct flock fl;
345         int ret;
346
347         if (tdb->flags & TDB_NOLOCK) {
348                 return 0;
349         }
350
351         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
352                 tdb->ecode = TDB_ERR_RDONLY;
353                 return -1;
354         }
355
356         fl.l_type = rw_type;
357         fl.l_whence = SEEK_SET;
358         fl.l_start = offset;
359         fl.l_len = len;
360         fl.l_pid = 0;
361
362         do {
363                 ret = fcntl(tdb->fd,lck_type,&fl);
364         } while (ret == -1 && errno == EINTR);
365
366         if (ret == -1) {
367                 /* Generic lock error. errno set by fcntl.
368                  * EAGAIN is an expected return from non-blocking
369                  * locks. */
370                 if (!probe && lck_type != F_SETLK) {
371                         /* Ensure error code is set for log fun to examine. */
372                         tdb->ecode = TDB_ERR_LOCK;
373                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
374                                  tdb->fd, offset, rw_type, lck_type, (int)len));
375                 }
376                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
377         }
378         return 0;
379 }
380
381
382 /*
383   upgrade a read lock to a write lock. This needs to be handled in a
384   special way as some OSes (such as solaris) have too conservative
385   deadlock detection and claim a deadlock when progress can be
386   made. For those OSes we may loop for a while.
387 */
388 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
389 {
390         int count = 1000;
391         while (count--) {
392                 struct timeval tv;
393                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
394                         return 0;
395                 }
396                 if (errno != EDEADLK) {
397                         break;
398                 }
399                 /* sleep for as short a time as we can - more portable than usleep() */
400                 tv.tv_sec = 0;
401                 tv.tv_usec = 1;
402                 select(0, NULL, NULL, NULL, &tv);
403         }
404         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
405         return -1;
406 }
407
408
409 /* lock a list in the database. list -1 is the alloc list */
410 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
411 {
412         struct tdb_lock_type *new_lck;
413         int i;
414         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
415
416         ltype &= ~TDB_MARK_LOCK;
417
418         /* a global lock allows us to avoid per chain locks */
419         if (tdb->global_lock.count &&
420             ((u32)ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
421                 return 0;
422         }
423
424         if (tdb->global_lock.count) {
425                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
426         }
427
428         if (list < -1 || list >= (int)tdb->header.hash_size) {
429                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
430                            list, ltype));
431                 return -1;
432         }
433         if (tdb->flags & TDB_NOLOCK)
434                 return 0;
435
436         for (i=0; i<tdb->num_lockrecs; i++) {
437                 if (tdb->lockrecs[i].list == list) {
438                         if (tdb->lockrecs[i].count == 0) {
439                                 /*
440                                  * Can't happen, see tdb_unlock(). It should
441                                  * be an assert.
442                                  */
443                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
444                                          "lck->count == 0 for list %d", list));
445                         }
446                         /*
447                          * Just increment the in-memory struct, posix locks
448                          * don't stack.
449                          */
450                         tdb->lockrecs[i].count++;
451                         return 0;
452                 }
453         }
454
455         new_lck = (struct tdb_lock_type *)realloc(
456                 tdb->lockrecs,
457                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
458         if (new_lck == NULL) {
459                 errno = ENOMEM;
460                 return -1;
461         }
462         tdb->lockrecs = new_lck;
463
464         /* Since fcntl locks don't nest, we do a lock for the first one,
465            and simply bump the count for future ones */
466         if (!mark_lock &&
467             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
468                                      0, 1)) {
469                 return -1;
470         }
471
472         tdb->num_locks++;
473
474         tdb->lockrecs[tdb->num_lockrecs].list = list;
475         tdb->lockrecs[tdb->num_lockrecs].count = 1;
476         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
477         tdb->num_lockrecs += 1;
478
479         return 0;
480 }
481
482 /* lock a list in the database. list -1 is the alloc list */
483 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
484 {
485         int ret;
486         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
487         if (ret) {
488                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
489                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
490         }
491         return ret;
492 }
493
494 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
495 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
496 {
497         return _tdb_lock(tdb, list, ltype, F_SETLK);
498 }
499
500
501 /* unlock the database: returns void because it's too late for errors. */
502         /* changed to return int it may be interesting to know there
503            has been an error  --simo */
504 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
505 {
506         int ret = -1;
507         int i;
508         struct tdb_lock_type *lck = NULL;
509         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
510
511         ltype &= ~TDB_MARK_LOCK;
512
513         /* a global lock allows us to avoid per chain locks */
514         if (tdb->global_lock.count &&
515             ((u32)ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
516                 return 0;
517         }
518
519         if (tdb->global_lock.count) {
520                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
521         }
522
523         if (tdb->flags & TDB_NOLOCK)
524                 return 0;
525
526         /* Sanity checks */
527         if (list < -1 || list >= (int)tdb->header.hash_size) {
528                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
529                 return ret;
530         }
531
532         for (i=0; i<tdb->num_lockrecs; i++) {
533                 if (tdb->lockrecs[i].list == list) {
534                         lck = &tdb->lockrecs[i];
535                         break;
536                 }
537         }
538
539         if ((lck == NULL) || (lck->count == 0)) {
540                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
541                 return -1;
542         }
543
544         if (lck->count > 1) {
545                 lck->count--;
546                 return 0;
547         }
548
549         /*
550          * This lock has count==1 left, so we need to unlock it in the
551          * kernel. We don't bother with decrementing the in-memory array
552          * element, we're about to overwrite it with the last array element
553          * anyway.
554          */
555
556         if (mark_lock) {
557                 ret = 0;
558         } else {
559                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
560                                                F_SETLKW, 0, 1);
561         }
562         tdb->num_locks--;
563
564         /*
565          * Shrink the array by overwriting the element just unlocked with the
566          * last array element.
567          */
568
569         if (tdb->num_lockrecs > 1) {
570                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
571         }
572         tdb->num_lockrecs -= 1;
573
574         /*
575          * We don't bother with realloc when the array shrinks, but if we have
576          * a completely idle tdb we should get rid of the locked array.
577          */
578
579         if (tdb->num_lockrecs == 0) {
580                 SAFE_FREE(tdb->lockrecs);
581         }
582
583         if (ret)
584                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
585         return ret;
586 }
587
588 /*
589   get the transaction lock
590  */
591 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
592 {
593         if (tdb->have_transaction_lock || tdb->global_lock.count) {
594                 return 0;
595         }
596         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
597                                      F_SETLKW, 0, 1) == -1) {
598                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
599                 tdb->ecode = TDB_ERR_LOCK;
600                 return -1;
601         }
602         tdb->have_transaction_lock = 1;
603         return 0;
604 }
605
606 /*
607   release the transaction lock
608  */
609 int tdb_transaction_unlock(struct tdb_context *tdb)
610 {
611         int ret;
612         if (!tdb->have_transaction_lock) {
613                 return 0;
614         }
615         ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
616         if (ret == 0) {
617                 tdb->have_transaction_lock = 0;
618         }
619         return ret;
620 }
621
622
623
624
625 /* lock/unlock entire database */
626 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
627 {
628         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
629
630         ltype &= ~TDB_MARK_LOCK;
631
632         /* There are no locks on read-only dbs */
633         if (tdb->read_only || tdb->traverse_read)
634                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
635
636         if (tdb->global_lock.count && tdb->global_lock.ltype == (u32)ltype) {
637                 tdb->global_lock.count++;
638                 return 0;
639         }
640
641         if (tdb->global_lock.count) {
642                 /* a global lock of a different type exists */
643                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
644         }
645
646         if (tdb->num_locks != 0) {
647                 /* can't combine global and chain locks */
648                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
649         }
650
651         if (!mark_lock &&
652             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
653                                      0, 4*tdb->header.hash_size)) {
654                 if (op == F_SETLKW) {
655                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
656                 }
657                 return -1;
658         }
659
660         tdb->global_lock.count = 1;
661         tdb->global_lock.ltype = ltype;
662
663         return 0;
664 }
665
666
667
668 /* unlock entire db */
669 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
670 {
671         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
672
673         ltype &= ~TDB_MARK_LOCK;
674
675         /* There are no locks on read-only dbs */
676         if (tdb->read_only || tdb->traverse_read) {
677                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
678         }
679
680         if (tdb->global_lock.ltype != (u32)ltype ||
681             tdb->global_lock.count == 0) {
682                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
683         }
684
685         if (tdb->global_lock.count > 1) {
686                 tdb->global_lock.count--;
687                 return 0;
688         }
689
690         if (!mark_lock &&
691             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
692                                      0, 4*tdb->header.hash_size)) {
693                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
694                 return -1;
695         }
696
697         tdb->global_lock.count = 0;
698         tdb->global_lock.ltype = 0;
699
700         return 0;
701 }
702
703 /* lock entire database with write lock */
704 int tdb_lockall(struct tdb_context *tdb)
705 {
706         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
707 }
708
709 /* lock entire database with write lock - mark only */
710 int tdb_lockall_mark(struct tdb_context *tdb)
711 {
712         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
713 }
714
715 /* unlock entire database with write lock - unmark only */
716 int tdb_lockall_unmark(struct tdb_context *tdb)
717 {
718         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
719 }
720
721 /* lock entire database with write lock - nonblocking variant */
722 int tdb_lockall_nonblock(struct tdb_context *tdb)
723 {
724         return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
725 }
726
727 /* unlock entire database with write lock */
728 int tdb_unlockall(struct tdb_context *tdb)
729 {
730         return _tdb_unlockall(tdb, F_WRLCK);
731 }
732
733 /* lock entire database with read lock */
734 int tdb_lockall_read(struct tdb_context *tdb)
735 {
736         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
737 }
738
739 /* lock entire database with read lock - nonblock variant */
740 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
741 {
742         return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
743 }
744
745 /* unlock entire database with read lock */
746 int tdb_unlockall_read(struct tdb_context *tdb)
747 {
748         return _tdb_unlockall(tdb, F_RDLCK);
749 }
750
751 /* lock/unlock one hash chain. This is meant to be used to reduce
752    contention - it cannot guarantee how many records will be locked */
753 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
754 {
755         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
756 }
757
758 /* lock/unlock one hash chain, non-blocking. This is meant to be used
759    to reduce contention - it cannot guarantee how many records will be
760    locked */
761 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
762 {
763         return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
764 }
765
766 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
767 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
768 {
769         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
770 }
771
772 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
773 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
774 {
775         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
776 }
777
778 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
779 {
780         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
781 }
782
783 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
784 {
785         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
786 }
787
788 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
789 {
790         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
791 }
792
793
794
795 /* record lock stops delete underneath */
796 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
797 {
798         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
799 }
800
801 /*
802   Write locks override our own fcntl readlocks, so check it here.
803   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
804   an error to fail to get the lock here.
805 */
806 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
807 {
808         struct tdb_traverse_lock *i;
809         for (i = &tdb->travlocks; i; i = i->next)
810                 if (i->off == off)
811                         return -1;
812         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
813 }
814
815 /*
816   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
817   an error to fail to get the lock here.
818 */
819 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
820 {
821         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
822 }
823
824 /* fcntl locks don't stack: avoid unlocking someone else's */
825 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
826 {
827         struct tdb_traverse_lock *i;
828         u32 count = 0;
829
830         if (off == 0)
831                 return 0;
832         for (i = &tdb->travlocks; i; i = i->next)
833                 if (i->off == off)
834                         count++;
835         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
836 }
837
838 /* file: io.c */
839
840 /* check for an out of bounds access - if it is out of bounds then
841    see if the database has been expanded by someone else and expand
842    if necessary
843    note that "len" is the minimum length needed for the db
844 */
845 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
846 {
847         struct stat st;
848         if (len <= tdb->map_size)
849                 return 0;
850         if (tdb->flags & TDB_INTERNAL) {
851                 if (!probe) {
852                         /* Ensure ecode is set for log fn. */
853                         tdb->ecode = TDB_ERR_IO;
854                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
855                                  (int)len, (int)tdb->map_size));
856                 }
857                 return TDB_ERRCODE(TDB_ERR_IO, -1);
858         }
859
860         if (fstat(tdb->fd, &st) == -1) {
861                 return TDB_ERRCODE(TDB_ERR_IO, -1);
862         }
863
864         if (st.st_size < (off_t)len) {
865                 if (!probe) {
866                         /* Ensure ecode is set for log fn. */
867                         tdb->ecode = TDB_ERR_IO;
868                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
869                                  (int)len, (int)st.st_size));
870                 }
871                 return TDB_ERRCODE(TDB_ERR_IO, -1);
872         }
873
874         /* Unmap, update size, remap */
875         if (tdb_munmap(tdb) == -1)
876                 return TDB_ERRCODE(TDB_ERR_IO, -1);
877         tdb->map_size = st.st_size;
878         tdb_mmap(tdb);
879         return 0;
880 }
881
882 /* write a lump of data at a specified offset */
883 static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
884                      const void *buf, tdb_len_t len)
885 {
886         if (len == 0) {
887                 return 0;
888         }
889
890         if (tdb->read_only || tdb->traverse_read) {
891                 tdb->ecode = TDB_ERR_RDONLY;
892                 return -1;
893         }
894
895         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
896                 return -1;
897
898         if (tdb->map_ptr) {
899                 memcpy(off + (char *)tdb->map_ptr, buf, len);
900         } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
901                 /* Ensure ecode is set for log fn. */
902                 tdb->ecode = TDB_ERR_IO;
903                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
904                            off, len, strerror(errno)));
905                 return TDB_ERRCODE(TDB_ERR_IO, -1);
906         }
907         return 0;
908 }
909
910 /* Endian conversion: we only ever deal with 4 byte quantities */
911 void *tdb_convert(void *buf, u32 size)
912 {
913         u32 i, *p = (u32 *)buf;
914         for (i = 0; i < size / 4; i++)
915                 p[i] = TDB_BYTEREV(p[i]);
916         return buf;
917 }
918
919
920 /* read a lump of data at a specified offset, maybe convert */
921 static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
922                     tdb_len_t len, int cv)
923 {
924         if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
925                 return -1;
926         }
927
928         if (tdb->map_ptr) {
929                 memcpy(buf, off + (char *)tdb->map_ptr, len);
930         } else {
931                 ssize_t ret = pread(tdb->fd, buf, len, off);
932                 if (ret != (ssize_t)len) {
933                         /* Ensure ecode is set for log fn. */
934                         tdb->ecode = TDB_ERR_IO;
935                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
936                                  "len=%d ret=%d (%s) map_size=%d\n",
937                                  (int)off, (int)len, (int)ret, strerror(errno),
938                                  (int)tdb->map_size));
939                         return TDB_ERRCODE(TDB_ERR_IO, -1);
940                 }
941         }
942         if (cv) {
943                 tdb_convert(buf, len);
944         }
945         return 0;
946 }
947
948
949
950 /*
951   do an unlocked scan of the hash table heads to find the next non-zero head. The value
952   will then be confirmed with the lock held
953 */
954 static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
955 {
956         u32 h = *chain;
957         if (tdb->map_ptr) {
958                 for (;h < tdb->header.hash_size;h++) {
959                         if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
960                                 break;
961                         }
962                 }
963         } else {
964                 u32 off=0;
965                 for (;h < tdb->header.hash_size;h++) {
966                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
967                                 break;
968                         }
969                 }
970         }
971         (*chain) = h;
972 }
973
974
975 int tdb_munmap(struct tdb_context *tdb)
976 {
977         if (tdb->flags & TDB_INTERNAL)
978                 return 0;
979
980 #ifdef HAVE_MMAP
981         if (tdb->map_ptr) {
982                 int ret = munmap(tdb->map_ptr, tdb->real_map_size);
983                 if (ret != 0)
984                         return ret;
985                 tdb->real_map_size = 0;
986         }
987 #endif
988         tdb->map_ptr = NULL;
989         return 0;
990 }
991
992 void tdb_mmap(struct tdb_context *tdb)
993 {
994         if (tdb->flags & TDB_INTERNAL)
995                 return;
996
997 #ifdef HAVE_MMAP
998         if (!(tdb->flags & TDB_NOMMAP)) {
999                 tdb->map_ptr = mmap(NULL, tdb->map_size,
1000                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE),
1001                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
1002
1003                 /*
1004                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
1005                  */
1006
1007                 if (tdb->map_ptr == MAP_FAILED) {
1008                         tdb->real_map_size = 0;
1009                         tdb->map_ptr = NULL;
1010                         TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1011                                  tdb->map_size, strerror(errno)));
1012                 }
1013                 tdb->real_map_size = tdb->map_size;
1014         } else {
1015                 tdb->map_ptr = NULL;
1016         }
1017 #else
1018         tdb->map_ptr = NULL;
1019 #endif
1020 }
1021
1022 /* expand a file.  we prefer to use ftruncate, as that is what posix
1023   says to use for mmap expansion */
1024 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1025 {
1026         char buf[1024];
1027
1028         if (tdb->read_only || tdb->traverse_read) {
1029                 tdb->ecode = TDB_ERR_RDONLY;
1030                 return -1;
1031         }
1032
1033         if (ftruncate(tdb->fd, size+addition) == -1) {
1034                 char b = 0;
1035                 if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1036                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1037                                  size+addition, strerror(errno)));
1038                         return -1;
1039                 }
1040         }
1041
1042         /* now fill the file with something. This ensures that the
1043            file isn't sparse, which would be very bad if we ran out of
1044            disk. This must be done with write, not via mmap */
1045         memset(buf, TDB_PAD_BYTE, sizeof(buf));
1046         while (addition) {
1047                 int n = addition>sizeof(buf)?sizeof(buf):addition;
1048                 int ret = pwrite(tdb->fd, buf, n, size);
1049                 if (ret != n) {
1050                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1051                                    n, strerror(errno)));
1052                         return -1;
1053                 }
1054                 addition -= n;
1055                 size += n;
1056         }
1057         return 0;
1058 }
1059
1060
1061 /* expand the database at least size bytes by expanding the underlying
1062    file and doing the mmap again if necessary */
1063 int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1064 {
1065         struct list_struct rec;
1066         tdb_off_t offset;
1067
1068         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1069                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1070                 return -1;
1071         }
1072
1073         /* must know about any previous expansions by another process */
1074         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1075
1076         /* always make room for at least 10 more records, and round
1077            the database up to a multiple of the page size */
1078         size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1079
1080         if (!(tdb->flags & TDB_INTERNAL))
1081                 tdb_munmap(tdb);
1082
1083         /*
1084          * We must ensure the file is unmapped before doing this
1085          * to ensure consistency with systems like OpenBSD where
1086          * writes and mmaps are not consistent.
1087          */
1088
1089         /* expand the file itself */
1090         if (!(tdb->flags & TDB_INTERNAL)) {
1091                 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1092                         goto fail;
1093         }
1094
1095         tdb->map_size += size;
1096
1097         if (tdb->flags & TDB_INTERNAL) {
1098                 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1099                                                     tdb->map_size);
1100                 if (!new_map_ptr) {
1101                         tdb->map_size -= size;
1102                         goto fail;
1103                 }
1104                 tdb->map_ptr = new_map_ptr;
1105         } else {
1106                 /*
1107                  * We must ensure the file is remapped before adding the space
1108                  * to ensure consistency with systems like OpenBSD where
1109                  * writes and mmaps are not consistent.
1110                  */
1111
1112                 /* We're ok if the mmap fails as we'll fallback to read/write */
1113                 tdb_mmap(tdb);
1114         }
1115
1116         /* form a new freelist record */
1117         memset(&rec,'\0',sizeof(rec));
1118         rec.rec_len = size - sizeof(rec);
1119
1120         /* link it into the free list */
1121         offset = tdb->map_size - size;
1122         if (tdb_free(tdb, offset, &rec) == -1)
1123                 goto fail;
1124
1125         tdb_unlock(tdb, -1, F_WRLCK);
1126         return 0;
1127  fail:
1128         tdb_unlock(tdb, -1, F_WRLCK);
1129         return -1;
1130 }
1131
1132 /* read/write a tdb_off_t */
1133 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1134 {
1135         return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1136 }
1137
1138 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1139 {
1140         tdb_off_t off = *d;
1141         return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1142 }
1143
1144
1145 /* read a lump of data, allocating the space for it */
1146 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1147 {
1148         unsigned char *buf;
1149
1150         /* some systems don't like zero length malloc */
1151         if (len == 0) {
1152                 len = 1;
1153         }
1154
1155         if (!(buf = (unsigned char *)malloc(len))) {
1156                 /* Ensure ecode is set for log fn. */
1157                 tdb->ecode = TDB_ERR_OOM;
1158                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1159                            len, strerror(errno)));
1160                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1161         }
1162         if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1163                 SAFE_FREE(buf);
1164                 return NULL;
1165         }
1166         return buf;
1167 }
1168
1169 /* Give a piece of tdb data to a parser */
1170
1171 int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1172                    tdb_off_t offset, tdb_len_t len,
1173                    int (*parser)(TDB_DATA key, TDB_DATA data,
1174                                  void *private_data),
1175                    void *private_data)
1176 {
1177         TDB_DATA data;
1178         int result;
1179
1180         data.dsize = len;
1181
1182         if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1183                 /*
1184                  * Optimize by avoiding the malloc/memcpy/free, point the
1185                  * parser directly at the mmap area.
1186                  */
1187                 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1188                         return -1;
1189                 }
1190                 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1191                 return parser(key, data, private_data);
1192         }
1193
1194         if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1195                 return -1;
1196         }
1197
1198         result = parser(key, data, private_data);
1199         free(data.dptr);
1200         return result;
1201 }
1202
1203 /* read/write a record */
1204 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1205 {
1206         if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1207                 return -1;
1208         if (TDB_BAD_MAGIC(rec)) {
1209                 /* Ensure ecode is set for log fn. */
1210                 tdb->ecode = TDB_ERR_CORRUPT;
1211                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1212                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1213         }
1214         return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1215 }
1216
1217 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1218 {
1219         struct list_struct r = *rec;
1220         return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1221 }
1222
1223 static const struct tdb_methods io_methods = {
1224         tdb_read,
1225         tdb_write,
1226         tdb_next_hash_chain,
1227         tdb_oob,
1228         tdb_expand_file,
1229         tdb_brlock
1230 };
1231
1232 /*
1233   initialise the default methods table
1234 */
1235 void tdb_io_init(struct tdb_context *tdb)
1236 {
1237         tdb->methods = &io_methods;
1238 }
1239
1240 /* file: transaction.c */
1241
1242 /*
1243   transaction design:
1244
1245   - only allow a single transaction at a time per database. This makes
1246     using the transaction API simpler, as otherwise the caller would
1247     have to cope with temporary failures in transactions that conflict
1248     with other current transactions
1249
1250   - keep the transaction recovery information in the same file as the
1251     database, using a special 'transaction recovery' record pointed at
1252     by the header. This removes the need for extra journal files as
1253     used by some other databases
1254
1255   - dynamically allocated the transaction recover record, re-using it
1256     for subsequent transactions. If a larger record is needed then
1257     tdb_free() the old record to place it on the normal tdb freelist
1258     before allocating the new record
1259
1260   - during transactions, keep a linked list of writes all that have
1261     been performed by intercepting all tdb_write() calls. The hooked
1262     transaction versions of tdb_read() and tdb_write() check this
1263     linked list and try to use the elements of the list in preference
1264     to the real database.
1265
1266   - don't allow any locks to be held when a transaction starts,
1267     otherwise we can end up with deadlock (plus lack of lock nesting
1268     in posix locks would mean the lock is lost)
1269
1270   - if the caller gains a lock during the transaction but doesn't
1271     release it then fail the commit
1272
1273   - allow for nested calls to tdb_transaction_start(), re-using the
1274     existing transaction record. If the inner transaction is cancelled
1275     then a subsequent commit will fail
1276
1277   - keep a mirrored copy of the tdb hash chain heads to allow for the
1278     fast hash heads scan on traverse, updating the mirrored copy in
1279     the transaction version of tdb_write
1280
1281   - allow callers to mix transaction and non-transaction use of tdb,
1282     although once a transaction is started then an exclusive lock is
1283     gained until the transaction is committed or cancelled
1284
1285   - the commit strategy involves first saving away all modified data
1286     into a linearised buffer in the transaction recovery area, then
1287     marking the transaction recovery area with a magic value to
1288     indicate a valid recovery record. In total 4 fsync/msync calls are
1289     needed per commit to prevent race conditions. It might be possible
1290     to reduce this to 3 or even 2 with some more work.
1291
1292   - check for a valid recovery record on open of the tdb, while the
1293     global lock is held. Automatically recover from the transaction
1294     recovery area if needed, then continue with the open as
1295     usual. This allows for smooth crash recovery with no administrator
1296     intervention.
1297
1298   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1299     still available, but no transaction recovery area is used and no
1300     fsync/msync calls are made.
1301
1302 */
1303
1304 struct tdb_transaction_el {
1305         struct tdb_transaction_el *next, *prev;
1306         tdb_off_t offset;
1307         tdb_len_t length;
1308         unsigned char *data;
1309 };
1310
1311 /*
1312   hold the context of any current transaction
1313 */
1314 struct tdb_transaction {
1315         /* we keep a mirrored copy of the tdb hash heads here so
1316            tdb_next_hash_chain() can operate efficiently */
1317         u32 *hash_heads;
1318
1319         /* the original io methods - used to do IOs to the real db */
1320         const struct tdb_methods *io_methods;
1321
1322         /* the list of transaction elements. We use a doubly linked
1323            list with a last pointer to allow us to keep the list
1324            ordered, with first element at the front of the list. It
1325            needs to be doubly linked as the read/write traversals need
1326            to be backwards, while the commit needs to be forwards */
1327         struct tdb_transaction_el *elements, *elements_last;
1328
1329         /* non-zero when an internal transaction error has
1330            occurred. All write operations will then fail until the
1331            transaction is ended */
1332         int transaction_error;
1333
1334         /* when inside a transaction we need to keep track of any
1335            nested tdb_transaction_start() calls, as these are allowed,
1336            but don't create a new transaction */
1337         int nesting;
1338
1339         /* old file size before transaction */
1340         tdb_len_t old_map_size;
1341 };
1342
1343
1344 /*
1345   read while in a transaction. We need to check first if the data is in our list
1346   of transaction elements, then if not do a real read
1347 */
1348 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1349                             tdb_len_t len, int cv)
1350 {
1351         struct tdb_transaction_el *el;
1352
1353         /* we need to walk the list backwards to get the most recent data */
1354         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1355                 tdb_len_t partial;
1356
1357                 if (off+len <= el->offset) {
1358                         continue;
1359                 }
1360                 if (off >= el->offset + el->length) {
1361                         continue;
1362                 }
1363
1364                 /* an overlapping read - needs to be split into up to
1365                    2 reads and a memcpy */
1366                 if (off < el->offset) {
1367                         partial = el->offset - off;
1368                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1369                                 goto fail;
1370                         }
1371                         len -= partial;
1372                         off += partial;
1373                         buf = (void *)(partial + (char *)buf);
1374                 }
1375                 if (off + len <= el->offset + el->length) {
1376                         partial = len;
1377                 } else {
1378                         partial = el->offset + el->length - off;
1379                 }
1380                 memcpy(buf, el->data + (off - el->offset), partial);
1381                 if (cv) {
1382                         tdb_convert(buf, len);
1383                 }
1384                 len -= partial;
1385                 off += partial;
1386                 buf = (void *)(partial + (char *)buf);
1387
1388                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1389                         goto fail;
1390                 }
1391
1392                 return 0;
1393         }
1394
1395         /* its not in the transaction elements - do a real read */
1396         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1397
1398 fail:
1399         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1400         tdb->ecode = TDB_ERR_IO;
1401         tdb->transaction->transaction_error = 1;
1402         return -1;
1403 }
1404
1405
1406 /*
1407   write while in a transaction
1408 */
1409 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1410                              const void *buf, tdb_len_t len)
1411 {
1412         struct tdb_transaction_el *el, *best_el=NULL;
1413
1414         if (len == 0) {
1415                 return 0;
1416         }
1417
1418         /* if the write is to a hash head, then update the transaction
1419            hash heads */
1420         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1421             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1422                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1423                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1424         }
1425
1426         /* first see if we can replace an existing entry */
1427         for (el=tdb->transaction->elements_last;el;el=el->prev) {
1428                 tdb_len_t partial;
1429
1430                 if (best_el == NULL && off == el->offset+el->length) {
1431                         best_el = el;
1432                 }
1433
1434                 if (off+len <= el->offset) {
1435                         continue;
1436                 }
1437                 if (off >= el->offset + el->length) {
1438                         continue;
1439                 }
1440
1441                 /* an overlapping write - needs to be split into up to
1442                    2 writes and a memcpy */
1443                 if (off < el->offset) {
1444                         partial = el->offset - off;
1445                         if (transaction_write(tdb, off, buf, partial) != 0) {
1446                                 goto fail;
1447                         }
1448                         len -= partial;
1449                         off += partial;
1450                         buf = (const void *)(partial + (const char *)buf);
1451                 }
1452                 if (off + len <= el->offset + el->length) {
1453                         partial = len;
1454                 } else {
1455                         partial = el->offset + el->length - off;
1456                 }
1457                 memcpy(el->data + (off - el->offset), buf, partial);
1458                 len -= partial;
1459                 off += partial;
1460                 buf = (const void *)(partial + (const char *)buf);
1461
1462                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1463                         goto fail;
1464                 }
1465
1466                 return 0;
1467         }
1468
1469         /* see if we can append the new entry to an existing entry */
1470         if (best_el && best_el->offset + best_el->length == off &&
1471             (off+len < tdb->transaction->old_map_size ||
1472              off > tdb->transaction->old_map_size)) {
1473                 unsigned char *data = best_el->data;
1474                 el = best_el;
1475                 el->data = (unsigned char *)realloc(el->data,
1476                                                     el->length + len);
1477                 if (el->data == NULL) {
1478                         tdb->ecode = TDB_ERR_OOM;
1479                         tdb->transaction->transaction_error = 1;
1480                         el->data = data;
1481                         return -1;
1482                 }
1483                 if (buf) {
1484                         memcpy(el->data + el->length, buf, len);
1485                 } else {
1486                         memset(el->data + el->length, TDB_PAD_BYTE, len);
1487                 }
1488                 el->length += len;
1489                 return 0;
1490         }
1491
1492         /* add a new entry at the end of the list */
1493         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1494         if (el == NULL) {
1495                 tdb->ecode = TDB_ERR_OOM;
1496                 tdb->transaction->transaction_error = 1;
1497                 return -1;
1498         }
1499         el->next = NULL;
1500         el->prev = tdb->transaction->elements_last;
1501         el->offset = off;
1502         el->length = len;
1503         el->data = (unsigned char *)malloc(len);
1504         if (el->data == NULL) {
1505                 free(el);
1506                 tdb->ecode = TDB_ERR_OOM;
1507                 tdb->transaction->transaction_error = 1;
1508                 return -1;
1509         }
1510         if (buf) {
1511                 memcpy(el->data, buf, len);
1512         } else {
1513                 memset(el->data, TDB_PAD_BYTE, len);
1514         }
1515         if (el->prev) {
1516                 el->prev->next = el;
1517         } else {
1518                 tdb->transaction->elements = el;
1519         }
1520         tdb->transaction->elements_last = el;
1521         return 0;
1522
1523 fail:
1524         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1525         tdb->ecode = TDB_ERR_IO;
1526         tdb->transaction->transaction_error = 1;
1527         return -1;
1528 }
1529
1530 /*
1531   accelerated hash chain head search, using the cached hash heads
1532 */
1533 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1534 {
1535         u32 h = *chain;
1536         for (;h < tdb->header.hash_size;h++) {
1537                 /* the +1 takes account of the freelist */
1538                 if (0 != tdb->transaction->hash_heads[h+1]) {
1539                         break;
1540                 }
1541         }
1542         (*chain) = h;
1543 }
1544
1545 /*
1546   out of bounds check during a transaction
1547 */
1548 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len,
1549                            int probe EXT2FS_ATTR((unused)))
1550 {
1551         if (len <= tdb->map_size) {
1552                 return 0;
1553         }
1554         return TDB_ERRCODE(TDB_ERR_IO, -1);
1555 }
1556
1557 /*
1558   transaction version of tdb_expand().
1559 */
1560 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1561                                    tdb_off_t addition)
1562 {
1563         /* add a write to the transaction elements, so subsequent
1564            reads see the zero data */
1565         if (transaction_write(tdb, size, NULL, addition) != 0) {
1566                 return -1;
1567         }
1568
1569         return 0;
1570 }
1571
1572 /*
1573   brlock during a transaction - ignore them
1574 */
1575 static int transaction_brlock(struct tdb_context *tdb EXT2FS_ATTR((unused)),
1576                               tdb_off_t offset EXT2FS_ATTR((unused)),
1577                               int rw_type EXT2FS_ATTR((unused)),
1578                               int lck_type EXT2FS_ATTR((unused)),
1579                               int probe EXT2FS_ATTR((unused)),
1580                               size_t len EXT2FS_ATTR((unused)))
1581 {
1582         return 0;
1583 }
1584
1585 static const struct tdb_methods transaction_methods = {
1586         transaction_read,
1587         transaction_write,
1588         transaction_next_hash_chain,
1589         transaction_oob,
1590         transaction_expand_file,
1591         transaction_brlock
1592 };
1593
1594
1595 /*
1596   start a tdb transaction. No token is returned, as only a single
1597   transaction is allowed to be pending per tdb_context
1598 */
1599 int tdb_transaction_start(struct tdb_context *tdb)
1600 {
1601         /* some sanity checks */
1602         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1603                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1604                 tdb->ecode = TDB_ERR_EINVAL;
1605                 return -1;
1606         }
1607
1608         /* cope with nested tdb_transaction_start() calls */
1609         if (tdb->transaction != NULL) {
1610                 tdb->transaction->nesting++;
1611                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1612                          tdb->transaction->nesting));
1613                 return 0;
1614         }
1615
1616         if (tdb->num_locks != 0 || tdb->global_lock.count) {
1617                 /* the caller must not have any locks when starting a
1618                    transaction as otherwise we'll be screwed by lack
1619                    of nested locks in posix */
1620                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1621                 tdb->ecode = TDB_ERR_LOCK;
1622                 return -1;
1623         }
1624
1625         if (tdb->travlocks.next != NULL) {
1626                 /* you cannot use transactions inside a traverse (although you can use
1627                    traverse inside a transaction) as otherwise you can end up with
1628                    deadlock */
1629                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1630                 tdb->ecode = TDB_ERR_LOCK;
1631                 return -1;
1632         }
1633
1634         tdb->transaction = (struct tdb_transaction *)
1635                 calloc(sizeof(struct tdb_transaction), 1);
1636         if (tdb->transaction == NULL) {
1637                 tdb->ecode = TDB_ERR_OOM;
1638                 return -1;
1639         }
1640
1641         /* get the transaction write lock. This is a blocking lock. As
1642            discussed with Volker, there are a number of ways we could
1643            make this async, which we will probably do in the future */
1644         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1645                 SAFE_FREE(tdb->transaction);
1646                 return -1;
1647         }
1648
1649         /* get a read lock from the freelist to the end of file. This
1650            is upgraded to a write lock during the commit */
1651         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1652                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1653                 tdb->ecode = TDB_ERR_LOCK;
1654                 goto fail;
1655         }
1656
1657         /* setup a copy of the hash table heads so the hash scan in
1658            traverse can be fast */
1659         tdb->transaction->hash_heads = (u32 *)
1660                 calloc(tdb->header.hash_size+1, sizeof(u32));
1661         if (tdb->transaction->hash_heads == NULL) {
1662                 tdb->ecode = TDB_ERR_OOM;
1663                 goto fail;
1664         }
1665         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1666                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1667                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1668                 tdb->ecode = TDB_ERR_IO;
1669                 goto fail;
1670         }
1671
1672         /* make sure we know about any file expansions already done by
1673            anyone else */
1674         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1675         tdb->transaction->old_map_size = tdb->map_size;
1676
1677         /* finally hook the io methods, replacing them with
1678            transaction specific methods */
1679         tdb->transaction->io_methods = tdb->methods;
1680         tdb->methods = &transaction_methods;
1681
1682         /* by calling this transaction write here, we ensure that we don't grow the
1683            transaction linked list due to hash table updates */
1684         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1685                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
1686                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1687                 tdb->ecode = TDB_ERR_IO;
1688                 tdb->methods = tdb->transaction->io_methods;
1689                 goto fail;
1690         }
1691
1692         return 0;
1693
1694 fail:
1695         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1696         tdb_transaction_unlock(tdb);
1697         SAFE_FREE(tdb->transaction->hash_heads);
1698         SAFE_FREE(tdb->transaction);
1699         return -1;
1700 }
1701
1702
1703 /*
1704   cancel the current transaction
1705 */
1706 int tdb_transaction_cancel(struct tdb_context *tdb)
1707 {
1708         if (tdb->transaction == NULL) {
1709                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1710                 return -1;
1711         }
1712
1713         if (tdb->transaction->nesting != 0) {
1714                 tdb->transaction->transaction_error = 1;
1715                 tdb->transaction->nesting--;
1716                 return 0;
1717         }
1718
1719         tdb->map_size = tdb->transaction->old_map_size;
1720
1721         /* free all the transaction elements */
1722         while (tdb->transaction->elements) {
1723                 struct tdb_transaction_el *el = tdb->transaction->elements;
1724                 tdb->transaction->elements = el->next;
1725                 free(el->data);
1726                 free(el);
1727         }
1728
1729         /* remove any global lock created during the transaction */
1730         if (tdb->global_lock.count != 0) {
1731                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1732                 tdb->global_lock.count = 0;
1733         }
1734
1735         /* remove any locks created during the transaction */
1736         if (tdb->num_locks != 0) {
1737                 int i;
1738                 for (i=0;i<tdb->num_lockrecs;i++) {
1739                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1740                                    F_UNLCK,F_SETLKW, 0, 1);
1741                 }
1742                 tdb->num_locks = 0;
1743                 tdb->num_lockrecs = 0;
1744                 SAFE_FREE(tdb->lockrecs);
1745         }
1746
1747         /* restore the normal io methods */
1748         tdb->methods = tdb->transaction->io_methods;
1749
1750         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1751         tdb_transaction_unlock(tdb);
1752         SAFE_FREE(tdb->transaction->hash_heads);
1753         SAFE_FREE(tdb->transaction);
1754
1755         return 0;
1756 }
1757
1758 /*
1759   sync to disk
1760 */
1761 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1762 {
1763         if (fsync(tdb->fd) != 0) {
1764                 tdb->ecode = TDB_ERR_IO;
1765                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1766                 return -1;
1767         }
1768 #if defined(HAVE_MSYNC) && defined(MS_SYNC)
1769         if (tdb->map_ptr) {
1770                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1771                 if (msync(moffset + (char *)tdb->map_ptr,
1772                           length + (offset - moffset), MS_SYNC) != 0) {
1773                         tdb->ecode = TDB_ERR_IO;
1774                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1775                                  strerror(errno)));
1776                         return -1;
1777                 }
1778         }
1779 #endif
1780         return 0;
1781 }
1782
1783
1784 /*
1785   work out how much space the linearised recovery data will consume
1786 */
1787 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1788 {
1789         struct tdb_transaction_el *el;
1790         tdb_len_t recovery_size = 0;
1791
1792         recovery_size = sizeof(u32);
1793         for (el=tdb->transaction->elements;el;el=el->next) {
1794                 if (el->offset >= tdb->transaction->old_map_size) {
1795                         continue;
1796                 }
1797                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1798         }
1799
1800         return recovery_size;
1801 }
1802
1803 /*
1804   allocate the recovery area, or use an existing recovery area if it is
1805   large enough
1806 */
1807 static int tdb_recovery_allocate(struct tdb_context *tdb,
1808                                  tdb_len_t *recovery_size,
1809                                  tdb_off_t *recovery_offset,
1810                                  tdb_len_t *recovery_max_size)
1811 {
1812         struct list_struct rec;
1813         const struct tdb_methods *methods = tdb->transaction->io_methods;
1814         tdb_off_t recovery_head;
1815
1816         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1817                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1818                 return -1;
1819         }
1820
1821         rec.rec_len = 0;
1822
1823         if (recovery_head != 0 &&
1824             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1825                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1826                 return -1;
1827         }
1828
1829         *recovery_size = tdb_recovery_size(tdb);
1830
1831         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1832                 /* it fits in the existing area */
1833                 *recovery_max_size = rec.rec_len;
1834                 *recovery_offset = recovery_head;
1835                 return 0;
1836         }
1837
1838         /* we need to free up the old recovery area, then allocate a
1839            new one at the end of the file. Note that we cannot use
1840            tdb_allocate() to allocate the new one as that might return
1841            us an area that is being currently used (as of the start of
1842            the transaction) */
1843         if (recovery_head != 0) {
1844                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1845                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1846                         return -1;
1847                 }
1848         }
1849
1850         /* the tdb_free() call might have increased the recovery size */
1851         *recovery_size = tdb_recovery_size(tdb);
1852
1853         /* round up to a multiple of page size */
1854         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1855         *recovery_offset = tdb->map_size;
1856         recovery_head = *recovery_offset;
1857
1858         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1859                                      (tdb->map_size - tdb->transaction->old_map_size) +
1860                                      sizeof(rec) + *recovery_max_size) == -1) {
1861                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1862                 return -1;
1863         }
1864
1865         /* remap the file (if using mmap) */
1866         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1867
1868         /* we have to reset the old map size so that we don't try to expand the file
1869            again in the transaction commit, which would destroy the recovery area */
1870         tdb->transaction->old_map_size = tdb->map_size;
1871
1872         /* write the recovery header offset and sync - we can sync without a race here
1873            as the magic ptr in the recovery record has not been set */
1874         CONVERT(recovery_head);
1875         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1876                                &recovery_head, sizeof(tdb_off_t)) == -1) {
1877                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1878                 return -1;
1879         }
1880
1881         return 0;
1882 }
1883
1884
1885 /*
1886   setup the recovery data that will be used on a crash during commit
1887 */
1888 static int transaction_setup_recovery(struct tdb_context *tdb,
1889                                       tdb_off_t *magic_offset)
1890 {
1891         struct tdb_transaction_el *el;
1892         tdb_len_t recovery_size;
1893         unsigned char *data, *p;
1894         const struct tdb_methods *methods = tdb->transaction->io_methods;
1895         struct list_struct *rec;
1896         tdb_off_t recovery_offset, recovery_max_size;
1897         tdb_off_t old_map_size = tdb->transaction->old_map_size;
1898         u32 magic, tailer;
1899
1900         /*
1901           check that the recovery area has enough space
1902         */
1903         if (tdb_recovery_allocate(tdb, &recovery_size,
1904                                   &recovery_offset, &recovery_max_size) == -1) {
1905                 return -1;
1906         }
1907
1908         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1909         if (data == NULL) {
1910                 tdb->ecode = TDB_ERR_OOM;
1911                 return -1;
1912         }
1913
1914         rec = (struct list_struct *)data;
1915         memset(rec, 0, sizeof(*rec));
1916
1917         rec->magic    = 0;
1918         rec->data_len = recovery_size;
1919         rec->rec_len  = recovery_max_size;
1920         rec->key_len  = old_map_size;
1921         CONVERT(rec);
1922
1923         /* build the recovery data into a single blob to allow us to do a single
1924            large write, which should be more efficient */
1925         p = data + sizeof(*rec);
1926         for (el=tdb->transaction->elements;el;el=el->next) {
1927                 if (el->offset >= old_map_size) {
1928                         continue;
1929                 }
1930                 if (el->offset + el->length > tdb->transaction->old_map_size) {
1931                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1932                         free(data);
1933                         tdb->ecode = TDB_ERR_CORRUPT;
1934                         return -1;
1935                 }
1936                 memcpy(p, &el->offset, 4);
1937                 memcpy(p+4, &el->length, 4);
1938                 if (DOCONV()) {
1939                         tdb_convert(p, 8);
1940                 }
1941                 /* the recovery area contains the old data, not the
1942                    new data, so we have to call the original tdb_read
1943                    method to get it */
1944                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1945                         free(data);
1946                         tdb->ecode = TDB_ERR_IO;
1947                         return -1;
1948                 }
1949                 p += 8 + el->length;
1950         }
1951
1952         /* and the tailer */
1953         tailer = sizeof(*rec) + recovery_max_size;
1954         memcpy(p, &tailer, 4);
1955         CONVERT(p);
1956
1957         /* write the recovery data to the recovery area */
1958         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1959                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1960                 free(data);
1961                 tdb->ecode = TDB_ERR_IO;
1962                 return -1;
1963         }
1964
1965         /* as we don't have ordered writes, we have to sync the recovery
1966            data before we update the magic to indicate that the recovery
1967            data is present */
1968         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1969                 free(data);
1970                 return -1;
1971         }
1972
1973         free(data);
1974
1975         magic = TDB_RECOVERY_MAGIC;
1976         CONVERT(magic);
1977
1978         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1979
1980         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1981                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1982                 tdb->ecode = TDB_ERR_IO;
1983                 return -1;
1984         }
1985
1986         /* ensure the recovery magic marker is on disk */
1987         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1988                 return -1;
1989         }
1990
1991         return 0;
1992 }
1993
1994 /*
1995   commit the current transaction
1996 */
1997 int tdb_transaction_commit(struct tdb_context *tdb)
1998 {
1999         const struct tdb_methods *methods;
2000         tdb_off_t magic_offset = 0;
2001         u32 zero = 0;
2002
2003         if (tdb->transaction == NULL) {
2004                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
2005                 return -1;
2006         }
2007
2008         if (tdb->transaction->transaction_error) {
2009                 tdb->ecode = TDB_ERR_IO;
2010                 tdb_transaction_cancel(tdb);
2011                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
2012                 return -1;
2013         }
2014
2015         if (tdb->transaction->nesting != 0) {
2016                 tdb->transaction->nesting--;
2017                 return 0;
2018         }
2019
2020         /* check for a null transaction */
2021         if (tdb->transaction->elements == NULL) {
2022                 tdb_transaction_cancel(tdb);
2023                 return 0;
2024         }
2025
2026         methods = tdb->transaction->io_methods;
2027
2028         /* if there are any locks pending then the caller has not
2029            nested their locks properly, so fail the transaction */
2030         if (tdb->num_locks || tdb->global_lock.count) {
2031                 tdb->ecode = TDB_ERR_LOCK;
2032                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2033                 tdb_transaction_cancel(tdb);
2034                 return -1;
2035         }
2036
2037         /* upgrade the main transaction lock region to a write lock */
2038         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2039                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2040                 tdb->ecode = TDB_ERR_LOCK;
2041                 tdb_transaction_cancel(tdb);
2042                 return -1;
2043         }
2044
2045         /* get the global lock - this prevents new users attaching to the database
2046            during the commit */
2047         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2048                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2049                 tdb->ecode = TDB_ERR_LOCK;
2050                 tdb_transaction_cancel(tdb);
2051                 return -1;
2052         }
2053
2054         if (!(tdb->flags & TDB_NOSYNC)) {
2055                 /* write the recovery data to the end of the file */
2056                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2057                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2058                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2059                         tdb_transaction_cancel(tdb);
2060                         return -1;
2061                 }
2062         }
2063
2064         /* expand the file to the new size if needed */
2065         if (tdb->map_size != tdb->transaction->old_map_size) {
2066                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2067                                              tdb->map_size -
2068                                              tdb->transaction->old_map_size) == -1) {
2069                         tdb->ecode = TDB_ERR_IO;
2070                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2071                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2072                         tdb_transaction_cancel(tdb);
2073                         return -1;
2074                 }
2075                 tdb->map_size = tdb->transaction->old_map_size;
2076                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2077         }
2078
2079         /* perform all the writes */
2080         while (tdb->transaction->elements) {
2081                 struct tdb_transaction_el *el = tdb->transaction->elements;
2082
2083                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2084                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2085
2086                         /* we've overwritten part of the data and
2087                            possibly expanded the file, so we need to
2088                            run the crash recovery code */
2089                         tdb->methods = methods;
2090                         tdb_transaction_recover(tdb);
2091
2092                         tdb_transaction_cancel(tdb);
2093                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2094
2095                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2096                         return -1;
2097                 }
2098                 tdb->transaction->elements = el->next;
2099                 free(el->data);
2100                 free(el);
2101         }
2102
2103         if (!(tdb->flags & TDB_NOSYNC)) {
2104                 /* ensure the new data is on disk */
2105                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2106                         return -1;
2107                 }
2108
2109                 /* remove the recovery marker */
2110                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2111                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2112                         return -1;
2113                 }
2114
2115                 /* ensure the recovery marker has been removed on disk */
2116                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
2117                         return -1;
2118                 }
2119         }
2120
2121         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2122
2123         /*
2124           TODO: maybe write to some dummy hdr field, or write to magic
2125           offset without mmap, before the last sync, instead of the
2126           utime() call
2127         */
2128
2129         /* on some systems (like Linux 2.6.x) changes via mmap/msync
2130            don't change the mtime of the file, this means the file may
2131            not be backed up (as tdb rounding to block sizes means that
2132            file size changes are quite rare too). The following forces
2133            mtime changes when a transaction completes */
2134 #ifdef HAVE_UTIME
2135         utime(tdb->name, NULL);
2136 #endif
2137
2138         /* use a transaction cancel to free memory and remove the
2139            transaction locks */
2140         tdb_transaction_cancel(tdb);
2141         return 0;
2142 }
2143
2144
2145 /*
2146   recover from an aborted transaction. Must be called with exclusive
2147   database write access already established (including the global
2148   lock to prevent new processes attaching)
2149 */
2150 int tdb_transaction_recover(struct tdb_context *tdb)
2151 {
2152         tdb_off_t recovery_head, recovery_eof;
2153         unsigned char *data, *p;
2154         u32 zero = 0;
2155         struct list_struct rec;
2156
2157         /* find the recovery area */
2158         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2159                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2160                 tdb->ecode = TDB_ERR_IO;
2161                 return -1;
2162         }
2163
2164         if (recovery_head == 0) {
2165                 /* we have never allocated a recovery record */
2166                 return 0;
2167         }
2168
2169         /* read the recovery record */
2170         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2171                                    sizeof(rec), DOCONV()) == -1) {
2172                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2173                 tdb->ecode = TDB_ERR_IO;
2174                 return -1;
2175         }
2176
2177         if (rec.magic != TDB_RECOVERY_MAGIC) {
2178                 /* there is no valid recovery data */
2179                 return 0;
2180         }
2181
2182         if (tdb->read_only) {
2183                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2184                 tdb->ecode = TDB_ERR_CORRUPT;
2185                 return -1;
2186         }
2187
2188         recovery_eof = rec.key_len;
2189
2190         data = (unsigned char *)malloc(rec.data_len);
2191         if (data == NULL) {
2192                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2193                 tdb->ecode = TDB_ERR_OOM;
2194                 return -1;
2195         }
2196
2197         /* read the full recovery data */
2198         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2199                                    rec.data_len, 0) == -1) {
2200                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2201                 tdb->ecode = TDB_ERR_IO;
2202                 free(data);
2203                 return -1;
2204         }
2205
2206         /* recover the file data */
2207         p = data;
2208         while (p+8 < data + rec.data_len) {
2209                 u32 ofs, len;
2210                 if (DOCONV()) {
2211                         tdb_convert(p, 8);
2212                 }
2213                 memcpy(&ofs, p, 4);
2214                 memcpy(&len, p+4, 4);
2215
2216                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2217                         free(data);
2218                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2219                         tdb->ecode = TDB_ERR_IO;
2220                         return -1;
2221                 }
2222                 p += 8 + len;
2223         }
2224
2225         free(data);
2226
2227         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2228                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2229                 tdb->ecode = TDB_ERR_IO;
2230                 return -1;
2231         }
2232
2233         /* if the recovery area is after the recovered eof then remove it */
2234         if (recovery_eof <= recovery_head) {
2235                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2236                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2237                         tdb->ecode = TDB_ERR_IO;
2238                         return -1;
2239                 }
2240         }
2241
2242         /* remove the recovery magic */
2243         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2244                           &zero) == -1) {
2245                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2246                 tdb->ecode = TDB_ERR_IO;
2247                 return -1;
2248         }
2249
2250         /* reduce the file size to the old size */
2251         tdb_munmap(tdb);
2252         if (ftruncate(tdb->fd, recovery_eof) != 0) {
2253                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2254                 tdb->ecode = TDB_ERR_IO;
2255                 return -1;
2256         }
2257         tdb->map_size = recovery_eof;
2258         tdb_mmap(tdb);
2259
2260         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2261                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2262                 tdb->ecode = TDB_ERR_IO;
2263                 return -1;
2264         }
2265
2266         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2267                  recovery_eof));
2268
2269         /* all done */
2270         return 0;
2271 }
2272
2273 /* file: freelist.c */
2274
2275 /* read a freelist record and check for simple errors */
2276 static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2277 {
2278         if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2279                 return -1;
2280
2281         if (rec->magic == TDB_MAGIC) {
2282                 /* this happens when a app is showdown while deleting a record - we should
2283                    not completely fail when this happens */
2284                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2285                          rec->magic, off));
2286                 rec->magic = TDB_FREE_MAGIC;
2287                 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2288                         return -1;
2289         }
2290
2291         if (rec->magic != TDB_FREE_MAGIC) {
2292                 /* Ensure ecode is set for log fn. */
2293                 tdb->ecode = TDB_ERR_CORRUPT;
2294                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2295                            rec->magic, off));
2296                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2297         }
2298         if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2299                 return -1;
2300         return 0;
2301 }
2302
2303
2304
2305 /* Remove an element from the freelist.  Must have alloc lock. */
2306 static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2307 {
2308         tdb_off_t last_ptr, i;
2309
2310         /* read in the freelist top */
2311         last_ptr = FREELIST_TOP;
2312         while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2313                 if (i == off) {
2314                         /* We've found it! */
2315                         return tdb_ofs_write(tdb, last_ptr, &next);
2316                 }
2317                 /* Follow chain (next offset is at start of record) */
2318                 last_ptr = i;
2319         }
2320         TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2321         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2322 }
2323
2324
2325 /* update a record tailer (must hold allocation lock) */
2326 static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2327                          const struct list_struct *rec)
2328 {
2329         tdb_off_t totalsize;
2330
2331         /* Offset of tailer from record header */
2332         totalsize = sizeof(*rec) + rec->rec_len;
2333         return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2334                          &totalsize);
2335 }
2336
2337 /* Add an element into the freelist. Merge adjacent records if
2338    necessary. */
2339 int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2340 {
2341         tdb_off_t right, left;
2342
2343         /* Allocation and tailer lock */
2344         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2345                 return -1;
2346
2347         /* set an initial tailer, so if we fail we don't leave a bogus record */
2348         if (update_tailer(tdb, offset, rec) != 0) {
2349                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2350                 goto fail;
2351         }
2352
2353         /* Look right first (I'm an Australian, dammit) */
2354         right = offset + sizeof(*rec) + rec->rec_len;
2355         if (right + sizeof(*rec) <= tdb->map_size) {
2356                 struct list_struct r;
2357
2358                 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2359                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2360                         goto left;
2361                 }
2362
2363                 /* If it's free, expand to include it. */
2364                 if (r.magic == TDB_FREE_MAGIC) {
2365                         if (remove_from_freelist(tdb, right, r.next) == -1) {
2366                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2367                                 goto left;
2368                         }
2369                         rec->rec_len += sizeof(r) + r.rec_len;
2370                 }
2371         }
2372
2373 left:
2374         /* Look left */
2375         left = offset - sizeof(tdb_off_t);
2376         if (left > TDB_DATA_START(tdb->header.hash_size)) {
2377                 struct list_struct l;
2378                 tdb_off_t leftsize;
2379
2380                 /* Read in tailer and jump back to header */
2381                 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2382                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2383                         goto update;
2384                 }
2385
2386                 /* it could be uninitialised data */
2387                 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2388                         goto update;
2389                 }
2390
2391                 left = offset - leftsize;
2392
2393                 /* Now read in record */
2394                 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2395                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2396                         goto update;
2397                 }
2398
2399                 /* If it's free, expand to include it. */
2400                 if (l.magic == TDB_FREE_MAGIC) {
2401                         if (remove_from_freelist(tdb, left, l.next) == -1) {
2402                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2403                                 goto update;
2404                         } else {
2405                                 offset = left;
2406                                 rec->rec_len += leftsize;
2407                         }
2408                 }
2409         }
2410
2411 update:
2412         if (update_tailer(tdb, offset, rec) == -1) {
2413                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2414                 goto fail;
2415         }
2416
2417         /* Now, prepend to free list */
2418         rec->magic = TDB_FREE_MAGIC;
2419
2420         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2421             tdb_rec_write(tdb, offset, rec) == -1 ||
2422             tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2423                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2424                 goto fail;
2425         }
2426
2427         /* And we're done. */
2428         tdb_unlock(tdb, -1, F_WRLCK);
2429         return 0;
2430
2431  fail:
2432         tdb_unlock(tdb, -1, F_WRLCK);
2433         return -1;
2434 }
2435
2436
2437 /*
2438    the core of tdb_allocate - called when we have decided which
2439    free list entry to use
2440  */
2441 static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2442                                 struct list_struct *rec, tdb_off_t last_ptr)
2443 {
2444         struct list_struct newrec;
2445         tdb_off_t newrec_ptr;
2446
2447         memset(&newrec, '\0', sizeof(newrec));
2448
2449         /* found it - now possibly split it up  */
2450         if (rec->rec_len > length + MIN_REC_SIZE) {
2451                 /* Length of left piece */
2452                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2453
2454                 /* Right piece to go on free list */
2455                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2456                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2457
2458                 /* And left record is shortened */
2459                 rec->rec_len = length;
2460         } else {
2461                 newrec_ptr = 0;
2462         }
2463
2464         /* Remove allocated record from the free list */
2465         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2466                 return 0;
2467         }
2468
2469         /* Update header: do this before we drop alloc
2470            lock, otherwise tdb_free() might try to
2471            merge with us, thinking we're free.
2472            (Thanks Jeremy Allison). */
2473         rec->magic = TDB_MAGIC;
2474         if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2475                 return 0;
2476         }
2477
2478         /* Did we create new block? */
2479         if (newrec_ptr) {
2480                 /* Update allocated record tailer (we
2481                    shortened it). */
2482                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2483                         return 0;
2484                 }
2485
2486                 /* Free new record */
2487                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2488                         return 0;
2489                 }
2490         }
2491
2492         /* all done - return the new record offset */
2493         return rec_ptr;
2494 }
2495
2496 /* allocate some space from the free list. The offset returned points
2497    to a unconnected list_struct within the database with room for at
2498    least length bytes of total data
2499
2500    0 is returned if the space could not be allocated
2501  */
2502 tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2503 {
2504         tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2505         struct {
2506                 tdb_off_t rec_ptr, last_ptr;
2507                 tdb_len_t rec_len;
2508         } bestfit;
2509
2510         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2511                 return 0;
2512
2513         /* Extra bytes required for tailer */
2514         length += sizeof(tdb_off_t);
2515
2516  again:
2517         last_ptr = FREELIST_TOP;
2518
2519         /* read in the freelist top */
2520         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2521                 goto fail;
2522
2523         bestfit.rec_ptr = 0;
2524         bestfit.last_ptr = 0;
2525         bestfit.rec_len = 0;
2526
2527         /*
2528            this is a best fit allocation strategy. Originally we used
2529            a first fit strategy, but it suffered from massive fragmentation
2530            issues when faced with a slowly increasing record size.
2531          */
2532         while (rec_ptr) {
2533                 if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2534                         goto fail;
2535                 }
2536
2537                 if (rec->rec_len >= length) {
2538                         if (bestfit.rec_ptr == 0 ||
2539                             rec->rec_len < bestfit.rec_len) {
2540                                 bestfit.rec_len = rec->rec_len;
2541                                 bestfit.rec_ptr = rec_ptr;
2542                                 bestfit.last_ptr = last_ptr;
2543                                 /* consider a fit to be good enough if
2544                                    we aren't wasting more than half
2545                                    the space */
2546                                 if (bestfit.rec_len < 2*length) {
2547                                         break;
2548                                 }
2549                         }
2550                 }
2551
2552                 /* move to the next record */
2553                 last_ptr = rec_ptr;
2554                 rec_ptr = rec->next;
2555         }
2556
2557         if (bestfit.rec_ptr != 0) {
2558                 if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2559                         goto fail;
2560                 }
2561
2562                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2563                 tdb_unlock(tdb, -1, F_WRLCK);
2564                 return newrec_ptr;
2565         }
2566
2567         /* we didn't find enough space. See if we can expand the
2568            database and if we can then try again */
2569         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2570                 goto again;
2571  fail:
2572         tdb_unlock(tdb, -1, F_WRLCK);
2573         return 0;
2574 }
2575
2576 /* file: freelistcheck.c */
2577
2578 /* Check the freelist is good and contains no loops.
2579    Very memory intensive - only do this as a consistency
2580    checker. Heh heh - uses an in memory tdb as the storage
2581    for the "seen" record list. For some reason this strikes
2582    me as extremely clever as I don't have to write another tree
2583    data structure implementation :-).
2584  */
2585
2586 static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2587 {
2588         TDB_DATA key, data;
2589
2590         memset(&data, '\0', sizeof(data));
2591         key.dptr = (unsigned char *)&rec_ptr;
2592         key.dsize = sizeof(rec_ptr);
2593         return tdb_store(mem_tdb, key, data, TDB_INSERT);
2594 }
2595
2596 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2597 {
2598         struct tdb_context *mem_tdb = NULL;
2599         struct list_struct rec;
2600         tdb_off_t rec_ptr, last_ptr;
2601         int ret = -1;
2602
2603         *pnum_entries = 0;
2604
2605         mem_tdb = tdb_open("flval", tdb->header.hash_size,
2606                                 TDB_INTERNAL, O_RDWR, 0600);
2607         if (!mem_tdb) {
2608                 return -1;
2609         }
2610
2611         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2612                 tdb_close(mem_tdb);
2613                 return 0;
2614         }
2615
2616         last_ptr = FREELIST_TOP;
2617
2618         /* Store the FREELIST_TOP record. */
2619         if (seen_insert(mem_tdb, last_ptr) == -1) {
2620                 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2621                 goto fail;
2622         }
2623
2624         /* read in the freelist top */
2625         if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2626                 goto fail;
2627         }
2628
2629         while (rec_ptr) {
2630
2631                 /* If we can't store this record (we've seen it
2632                    before) then the free list has a loop and must
2633                    be corrupt. */
2634
2635                 if (seen_insert(mem_tdb, rec_ptr)) {
2636                         ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2637                         goto fail;
2638                 }
2639
2640                 if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2641                         goto fail;
2642                 }
2643
2644                 /* move to the next record */
2645                 last_ptr = rec_ptr;
2646                 rec_ptr = rec.next;
2647                 *pnum_entries += 1;
2648         }
2649
2650         ret = 0;
2651
2652   fail:
2653
2654         tdb_close(mem_tdb);
2655         tdb_unlock(tdb, -1, F_WRLCK);
2656         return ret;
2657 }
2658
2659 /* file: traverse.c */
2660
2661 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2662 static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2663                          struct list_struct *rec)
2664 {
2665         int want_next = (tlock->off != 0);
2666
2667         /* Lock each chain from the start one. */
2668         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2669                 if (!tlock->off && tlock->hash != 0) {
2670                         /* this is an optimisation for the common case where
2671                            the hash chain is empty, which is particularly
2672                            common for the use of tdb with ldb, where large
2673                            hashes are used. In that case we spend most of our
2674                            time in tdb_brlock(), locking empty hash chains.
2675
2676                            To avoid this, we do an unlocked pre-check to see
2677                            if the hash chain is empty before starting to look
2678                            inside it. If it is empty then we can avoid that
2679                            hash chain. If it isn't empty then we can't believe
2680                            the value we get back, as we read it without a
2681                            lock, so instead we get the lock and re-fetch the
2682                            value below.
2683
2684                            Notice that not doing this optimisation on the
2685                            first hash chain is critical. We must guarantee
2686                            that we have done at least one fcntl lock at the
2687                            start of a search to guarantee that memory is
2688                            coherent on SMP systems. If records are added by
2689                            others during the search then that's OK, and we
2690                            could possibly miss those with this trick, but we
2691                            could miss them anyway without this trick, so the
2692                            semantics don't change.
2693
2694                            With a non-indexed ldb search this trick gains us a
2695                            factor of around 80 in speed on a linux 2.6.x
2696                            system (testing using ldbtest).
2697                         */
2698                         tdb->methods->next_hash_chain(tdb, &tlock->hash);
2699                         if (tlock->hash == tdb->header.hash_size) {
2700                                 continue;
2701                         }
2702                 }
2703
2704                 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2705                         return -1;
2706
2707                 /* No previous record?  Start at top of chain. */
2708                 if (!tlock->off) {
2709                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2710                                      &tlock->off) == -1)
2711                                 goto fail;
2712                 } else {
2713                         /* Otherwise unlock the previous record. */
2714                         if (tdb_unlock_record(tdb, tlock->off) != 0)
2715                                 goto fail;
2716                 }
2717
2718                 if (want_next) {
2719                         /* We have offset of old record: grab next */
2720                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2721                                 goto fail;
2722                         tlock->off = rec->next;
2723                 }
2724
2725                 /* Iterate through chain */
2726                 while( tlock->off) {
2727                         tdb_off_t current;
2728                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2729                                 goto fail;
2730
2731                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2732                         if (tlock->off == rec->next) {
2733                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2734                                 goto fail;
2735                         }
2736
2737                         if (!TDB_DEAD(rec)) {
2738                                 /* Woohoo: we found one! */
2739                                 if (tdb_lock_record(tdb, tlock->off) != 0)
2740                                         goto fail;
2741                                 return tlock->off;
2742                         }
2743
2744                         /* Try to clean dead ones from old traverses */
2745                         current = tlock->off;
2746                         tlock->off = rec->next;
2747                         if (!(tdb->read_only || tdb->traverse_read) &&
2748                             tdb_do_delete(tdb, current, rec) != 0)
2749                                 goto fail;
2750                 }
2751                 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2752                 want_next = 0;
2753         }
2754         /* We finished iteration without finding anything */
2755         return TDB_ERRCODE(TDB_SUCCESS, 0);
2756
2757  fail:
2758         tlock->off = 0;
2759         if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2760                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2761         return -1;
2762 }
2763
2764 /* traverse the entire database - calling fn(tdb, key, data) on each element.
2765    return -1 on error or the record count traversed
2766    if fn is NULL then it is not called
2767    a non-zero return value from fn() indicates that the traversal should stop
2768   */
2769 static int tdb_traverse_internal(struct tdb_context *tdb,
2770                                  tdb_traverse_func fn, void *private_data,
2771                                  struct tdb_traverse_lock *tl)
2772 {
2773         TDB_DATA key, dbuf;
2774         struct list_struct rec;
2775         int ret, count = 0;
2776
2777         /* This was in the initialization, above, but the IRIX compiler
2778          * did not like it.  crh
2779          */
2780         tl->next = tdb->travlocks.next;
2781
2782         /* fcntl locks don't stack: beware traverse inside traverse */
2783         tdb->travlocks.next = tl;
2784
2785         /* tdb_next_lock places locks on the record returned, and its chain */
2786         while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2787                 count++;
2788                 /* now read the full record */
2789                 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2790                                           rec.key_len + rec.data_len);
2791                 if (!key.dptr) {
2792                         ret = -1;
2793                         if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2794                                 goto out;
2795                         if (tdb_unlock_record(tdb, tl->off) != 0)
2796                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2797                         goto out;
2798                 }
2799                 key.dsize = rec.key_len;
2800                 dbuf.dptr = key.dptr + rec.key_len;
2801                 dbuf.dsize = rec.data_len;
2802
2803                 /* Drop chain lock, call out */
2804                 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2805                         ret = -1;
2806                         SAFE_FREE(key.dptr);
2807                         goto out;
2808                 }
2809                 if (fn && fn(tdb, key, dbuf, private_data)) {
2810                         /* They want us to terminate traversal */
2811                         ret = count;
2812                         if (tdb_unlock_record(tdb, tl->off) != 0) {
2813                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2814                                 ret = -1;
2815                         }
2816                         SAFE_FREE(key.dptr);
2817                         goto out;
2818                 }
2819                 SAFE_FREE(key.dptr);
2820         }
2821 out:
2822         tdb->travlocks.next = tl->next;
2823         if (ret < 0)
2824                 return -1;
2825         else
2826                 return count;
2827 }
2828
2829
2830 /*
2831   a write style traverse - temporarily marks the db read only
2832 */
2833 int tdb_traverse_read(struct tdb_context *tdb,
2834                       tdb_traverse_func fn, void *private_data)
2835 {
2836         struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2837         int ret;
2838
2839         /* we need to get a read lock on the transaction lock here to
2840            cope with the lock ordering semantics of solaris10 */
2841         if (tdb_transaction_lock(tdb, F_RDLCK)) {
2842                 return -1;
2843         }
2844
2845         tdb->traverse_read++;
2846         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2847         tdb->traverse_read--;
2848
2849         tdb_transaction_unlock(tdb);
2850
2851         return ret;
2852 }
2853
2854 /*
2855   a write style traverse - needs to get the transaction lock to
2856   prevent deadlocks
2857 */
2858 int tdb_traverse(struct tdb_context *tdb,
2859                  tdb_traverse_func fn, void *private_data)
2860 {
2861         struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2862         int ret;
2863
2864         if (tdb->read_only || tdb->traverse_read) {
2865                 return tdb_traverse_read(tdb, fn, private_data);
2866         }
2867
2868         if (tdb_transaction_lock(tdb, F_WRLCK)) {
2869                 return -1;
2870         }
2871
2872         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2873
2874         tdb_transaction_unlock(tdb);
2875
2876         return ret;
2877 }
2878
2879
2880 /* find the first entry in the database and return its key */
2881 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2882 {
2883         TDB_DATA key;
2884         struct list_struct rec;
2885
2886         /* release any old lock */
2887         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2888                 return tdb_null;
2889         tdb->travlocks.off = tdb->travlocks.hash = 0;
2890         tdb->travlocks.lock_rw = F_RDLCK;
2891
2892         /* Grab first record: locks chain and returned record. */
2893         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2894                 return tdb_null;
2895         /* now read the key */
2896         key.dsize = rec.key_len;
2897         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2898
2899         /* Unlock the hash chain of the record we just read. */
2900         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2901                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2902         return key;
2903 }
2904
2905 /* find the next entry in the database, returning its key */
2906 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2907 {
2908         u32 oldhash;
2909         TDB_DATA key = tdb_null;
2910         struct list_struct rec;
2911         unsigned char *k = NULL;
2912
2913         /* Is locked key the old key?  If so, traverse will be reliable. */
2914         if (tdb->travlocks.off) {
2915                 if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2916                         return tdb_null;
2917                 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2918                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2919                                             rec.key_len))
2920                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2921                         /* No, it wasn't: unlock it and start from scratch */
2922                         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2923                                 SAFE_FREE(k);
2924                                 return tdb_null;
2925                         }
2926                         if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2927                                 SAFE_FREE(k);
2928                                 return tdb_null;
2929                         }
2930                         tdb->travlocks.off = 0;
2931                 }
2932
2933                 SAFE_FREE(k);
2934         }
2935
2936         if (!tdb->travlocks.off) {
2937                 /* No previous element: do normal find, and lock record */
2938                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2939                 if (!tdb->travlocks.off)
2940                         return tdb_null;
2941                 tdb->travlocks.hash = BUCKET(rec.full_hash);
2942                 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2943                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2944                         return tdb_null;
2945                 }
2946         }
2947         oldhash = tdb->travlocks.hash;
2948
2949         /* Grab next record: locks chain and returned record,
2950            unlocks old record */
2951         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2952                 key.dsize = rec.key_len;
2953                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2954                                           key.dsize);
2955                 /* Unlock the chain of this new record */
2956                 if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2957                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2958         }
2959         /* Unlock the chain of old record */
2960         if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2961                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2962         return key;
2963 }
2964
2965 /* file: dump.c */
2966
2967 static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2968                                  tdb_off_t offset)
2969 {
2970         struct list_struct rec;
2971         tdb_off_t tailer_ofs, tailer;
2972
2973         if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2974                                    sizeof(rec), DOCONV()) == -1) {
2975                 printf("ERROR: failed to read record at %u\n", offset);
2976                 return 0;
2977         }
2978
2979         printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2980                "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2981                hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2982                rec.full_hash, rec.magic);
2983
2984         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2985
2986         if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2987                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2988                 return rec.next;
2989         }
2990
2991         if (tailer != rec.rec_len + sizeof(rec)) {
2992                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2993                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2994         }
2995         return rec.next;
2996 }
2997
2998 static int tdb_dump_chain(struct tdb_context *tdb, int i)
2999 {
3000         tdb_off_t rec_ptr, top;
3001
3002         top = TDB_HASH_TOP(i);
3003
3004         if (tdb_lock(tdb, i, F_WRLCK) != 0)
3005                 return -1;
3006
3007         if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
3008                 return tdb_unlock(tdb, i, F_WRLCK);
3009
3010         if (rec_ptr)
3011                 printf("hash=%d\n", i);
3012
3013         while (rec_ptr) {
3014                 rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
3015         }
3016
3017         return tdb_unlock(tdb, i, F_WRLCK);
3018 }
3019
3020 void tdb_dump_all(struct tdb_context *tdb)
3021 {
3022         int i;
3023         for (i = 0; i < (int)tdb->header.hash_size; i++) {
3024                 tdb_dump_chain(tdb, i);
3025         }
3026         printf("freelist:\n");
3027         tdb_dump_chain(tdb, -1);
3028 }
3029
3030 int tdb_printfreelist(struct tdb_context *tdb)
3031 {
3032         int ret;
3033         long total_free = 0;
3034         tdb_off_t offset, rec_ptr;
3035         struct list_struct rec;
3036
3037         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3038                 return ret;
3039
3040         offset = FREELIST_TOP;
3041
3042         /* read in the freelist top */
3043         if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3044                 tdb_unlock(tdb, -1, F_WRLCK);
3045                 return 0;
3046         }
3047
3048         printf("freelist top=[0x%08x]\n", rec_ptr );
3049         while (rec_ptr) {
3050                 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3051                                            sizeof(rec), DOCONV()) == -1) {
3052                         tdb_unlock(tdb, -1, F_WRLCK);
3053                         return -1;
3054                 }
3055
3056                 if (rec.magic != TDB_FREE_MAGIC) {
3057                         printf("bad magic 0x%08x in free list\n", rec.magic);
3058                         tdb_unlock(tdb, -1, F_WRLCK);
3059                         return -1;
3060                 }
3061
3062                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3063                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3064                 total_free += rec.rec_len;
3065
3066                 /* move to the next record */
3067                 rec_ptr = rec.next;
3068         }
3069         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3070                (int)total_free);
3071
3072         return tdb_unlock(tdb, -1, F_WRLCK);
3073 }
3074
3075 /* file: tdb.c */
3076
3077 /*
3078   non-blocking increment of the tdb sequence number if the tdb has been opened using
3079   the TDB_SEQNUM flag
3080 */
3081 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3082 {
3083         tdb_off_t seqnum=0;
3084
3085         if (!(tdb->flags & TDB_SEQNUM)) {
3086                 return;
3087         }
3088
3089         /* we ignore errors from this, as we have no sane way of
3090            dealing with them.
3091         */
3092         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3093         seqnum++;
3094         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3095 }
3096
3097 /*
3098   increment the tdb sequence number if the tdb has been opened using
3099   the TDB_SEQNUM flag
3100 */
3101 static void tdb_increment_seqnum(struct tdb_context *tdb)
3102 {
3103         if (!(tdb->flags & TDB_SEQNUM)) {
3104                 return;
3105         }
3106
3107         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3108                 return;
3109         }
3110
3111         tdb_increment_seqnum_nonblock(tdb);
3112
3113         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3114 }
3115
3116 static int tdb_key_compare(TDB_DATA key, TDB_DATA data,
3117                            void *private_data EXT2FS_ATTR((unused)))
3118 {
3119         return memcmp(data.dptr, key.dptr, data.dsize);
3120 }
3121
3122 /* Returns 0 on fail.  On success, return offset of record, and fills
3123    in rec */
3124 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3125                         struct list_struct *r)
3126 {
3127         tdb_off_t rec_ptr;
3128
3129         /* read in the hash top */
3130         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3131                 return 0;
3132
3133         /* keep looking until we find the right record */
3134         while (rec_ptr) {
3135                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3136                         return 0;
3137
3138                 if (!TDB_DEAD(r) && hash==r->full_hash
3139                     && key.dsize==r->key_len
3140                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3141                                       r->key_len, tdb_key_compare,
3142                                       NULL) == 0) {
3143                         return rec_ptr;
3144                 }
3145                 rec_ptr = r->next;
3146         }
3147         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3148 }
3149
3150 /* As tdb_find, but if you succeed, keep the lock */
3151 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3152                            struct list_struct *rec)
3153 {
3154         u32 rec_ptr;
3155
3156         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3157                 return 0;
3158         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3159                 tdb_unlock(tdb, BUCKET(hash), locktype);
3160         return rec_ptr;
3161 }
3162
3163
3164 /* update an entry in place - this only works if the new data size
3165    is <= the old data size and the key exists.
3166    on failure return -1.
3167 */
3168 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3169 {
3170         struct list_struct rec;
3171         tdb_off_t rec_ptr;
3172
3173         /* find entry */
3174         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3175                 return -1;
3176
3177         /* must be long enough key, data and tailer */
3178         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3179                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3180                 return -1;
3181         }
3182
3183         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3184                       dbuf.dptr, dbuf.dsize) == -1)
3185                 return -1;
3186
3187         if (dbuf.dsize != rec.data_len) {
3188                 /* update size */
3189                 rec.data_len = dbuf.dsize;
3190                 return tdb_rec_write(tdb, rec_ptr, &rec);
3191         }
3192
3193         return 0;
3194 }
3195
3196 /* find an entry in the database given a key */
3197 /* If an entry doesn't exist tdb_err will be set to
3198  * TDB_ERR_NOEXIST. If a key has no data attached
3199  * then the TDB_DATA will have zero length but
3200  * a non-zero pointer
3201  */
3202 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3203 {
3204         tdb_off_t rec_ptr;
3205         struct list_struct rec;
3206         TDB_DATA ret;
3207         u32 hash;
3208
3209         /* find which hash bucket it is in */
3210         hash = tdb->hash_fn(&key);
3211         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3212                 return tdb_null;
3213
3214         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3215                                   rec.data_len);
3216         ret.dsize = rec.data_len;
3217         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3218         return ret;
3219 }
3220
3221 /*
3222  * Find an entry in the database and hand the record's data to a parsing
3223  * function. The parsing function is executed under the chain read lock, so it
3224  * should be fast and should not block on other syscalls.
3225  *
3226  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3227  *
3228  * For mmapped tdb's that do not have a transaction open it points the parsing
3229  * function directly at the mmap area, it avoids the malloc/memcpy in this
3230  * case. If a transaction is open or no mmap is available, it has to do
3231  * malloc/read/parse/free.
3232  *
3233  * This is interesting for all readers of potentially large data structures in
3234  * the tdb records, ldb indexes being one example.
3235  */
3236
3237 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3238                      int (*parser)(TDB_DATA key, TDB_DATA data,
3239                                    void *private_data),
3240                      void *private_data)
3241 {
3242         tdb_off_t rec_ptr;
3243         struct list_struct rec;
3244         int ret;
3245         u32 hash;
3246
3247         /* find which hash bucket it is in */
3248         hash = tdb->hash_fn(&key);
3249
3250         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3251                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3252         }
3253
3254         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3255                              rec.data_len, parser, private_data);
3256
3257         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3258
3259         return ret;
3260 }
3261
3262 /* check if an entry in the database exists
3263
3264    note that 1 is returned if the key is found and 0 is returned if not found
3265    this doesn't match the conventions in the rest of this module, but is
3266    compatible with gdbm
3267 */
3268 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3269 {
3270         struct list_struct rec;
3271
3272         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3273                 return 0;
3274         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3275         return 1;
3276 }
3277
3278 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3279 {
3280         u32 hash = tdb->hash_fn(&key);
3281         return tdb_exists_hash(tdb, key, hash);
3282 }
3283
3284 /* actually delete an entry in the database given the offset */
3285 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3286 {
3287         tdb_off_t last_ptr, i;
3288         struct list_struct lastrec;
3289
3290         if (tdb->read_only || tdb->traverse_read) return -1;
3291
3292         if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3293                 /* Someone traversing here: mark it as dead */
3294                 rec->magic = TDB_DEAD_MAGIC;
3295                 return tdb_rec_write(tdb, rec_ptr, rec);
3296         }
3297         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3298                 return -1;
3299
3300         /* find previous record in hash chain */
3301         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3302                 return -1;
3303         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3304                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3305                         return -1;
3306
3307         /* unlink it: next ptr is at start of record. */
3308         if (last_ptr == 0)
3309                 last_ptr = TDB_HASH_TOP(rec->full_hash);
3310         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3311                 return -1;
3312
3313         /* recover the space */
3314         if (tdb_free(tdb, rec_ptr, rec) == -1)
3315                 return -1;
3316         return 0;
3317 }
3318
3319 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3320 {
3321         int res = 0;
3322         tdb_off_t rec_ptr;
3323         struct list_struct rec;
3324
3325         /* read in the hash top */
3326         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3327                 return 0;
3328
3329         while (rec_ptr) {
3330                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3331                         return 0;
3332
3333                 if (rec.magic == TDB_DEAD_MAGIC) {
3334                         res += 1;
3335                 }
3336                 rec_ptr = rec.next;
3337         }
3338         return res;
3339 }
3340
3341 /*
3342  * Purge all DEAD records from a hash chain
3343  */
3344 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3345 {
3346         int res = -1;
3347         struct list_struct rec;
3348         tdb_off_t rec_ptr;
3349
3350         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3351                 return -1;
3352         }
3353
3354         /* read in the hash top */
3355         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3356                 goto fail;
3357
3358         while (rec_ptr) {
3359                 tdb_off_t next;
3360
3361                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3362                         goto fail;
3363                 }
3364
3365                 next = rec.next;
3366
3367                 if (rec.magic == TDB_DEAD_MAGIC
3368                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3369                         goto fail;
3370                 }
3371                 rec_ptr = next;
3372         }
3373         res = 0;
3374  fail:
3375         tdb_unlock(tdb, -1, F_WRLCK);
3376         return res;
3377 }
3378
3379 /* delete an entry in the database given a key */
3380 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3381 {
3382         tdb_off_t rec_ptr;
3383         struct list_struct rec;
3384         int ret;
3385
3386         if (tdb->max_dead_records != 0) {
3387
3388                 /*
3389                  * Allow for some dead records per hash chain, mainly for
3390                  * tdb's with a very high create/delete rate like locking.tdb.
3391                  */
3392
3393                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3394                         return -1;
3395
3396                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3397                         /*
3398                          * Don't let the per-chain freelist grow too large,
3399                          * delete all existing dead records
3400                          */
3401                         tdb_purge_dead(tdb, hash);
3402                 }
3403
3404                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3405                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3406                         return -1;
3407                 }
3408
3409                 /*
3410                  * Just mark the record as dead.
3411                  */
3412                 rec.magic = TDB_DEAD_MAGIC;
3413                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3414         }
3415         else {
3416                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3417                                                    &rec)))
3418                         return -1;
3419
3420                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3421         }
3422
3423         if (ret == 0) {
3424                 tdb_increment_seqnum(tdb);
3425         }
3426
3427         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3428                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3429         return ret;
3430 }
3431
3432 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3433 {
3434         u32 hash = tdb->hash_fn(&key);
3435         return tdb_delete_hash(tdb, key, hash);
3436 }
3437
3438 /*
3439  * See if we have a dead record around with enough space
3440  */
3441 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3442                                struct list_struct *r, tdb_len_t length)
3443 {
3444         tdb_off_t rec_ptr;
3445
3446         /* read in the hash top */
3447         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3448                 return 0;
3449
3450         /* keep looking until we find the right record */
3451         while (rec_ptr) {
3452                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3453                         return 0;
3454
3455                 if (TDB_DEAD(r) && r->rec_len >= length) {
3456                         /*
3457                          * First fit for simple coding, TODO: change to best
3458                          * fit
3459                          */
3460                         return rec_ptr;
3461                 }
3462                 rec_ptr = r->next;
3463         }
3464         return 0;
3465 }
3466
3467 /* store an element in the database, replacing any existing element
3468    with the same key
3469
3470    return 0 on success, -1 on failure
3471 */
3472 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3473 {
3474         struct list_struct rec;
3475         u32 hash;
3476         tdb_off_t rec_ptr;
3477         char *p = NULL;
3478         int ret = -1;
3479
3480         if (tdb->read_only || tdb->traverse_read) {
3481                 tdb->ecode = TDB_ERR_RDONLY;
3482                 return -1;
3483         }
3484
3485         /* find which hash bucket it is in */
3486         hash = tdb->hash_fn(&key);
3487         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3488                 return -1;
3489
3490         /* check for it existing, on insert. */
3491         if (flag == TDB_INSERT) {
3492                 if (tdb_exists_hash(tdb, key, hash)) {
3493                         tdb->ecode = TDB_ERR_EXISTS;
3494                         goto fail;
3495                 }
3496         } else {
3497                 /* first try in-place update, on modify or replace. */
3498                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3499                         goto done;
3500                 }
3501                 if (tdb->ecode == TDB_ERR_NOEXIST &&
3502                     flag == TDB_MODIFY) {
3503                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
3504                          we should fail the store */
3505                         goto fail;
3506                 }
3507         }
3508         /* reset the error code potentially set by the tdb_update() */
3509         tdb->ecode = TDB_SUCCESS;
3510
3511         /* delete any existing record - if it doesn't exist we don't
3512            care.  Doing this first reduces fragmentation, and avoids
3513            coalescing with `allocated' block before it's updated. */
3514         if (flag != TDB_INSERT)
3515                 tdb_delete_hash(tdb, key, hash);
3516
3517         /* Copy key+value *before* allocating free space in case malloc
3518            fails and we are left with a dead spot in the tdb. */
3519
3520         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3521                 tdb->ecode = TDB_ERR_OOM;
3522                 goto fail;
3523         }
3524
3525         memcpy(p, key.dptr, key.dsize);
3526         if (dbuf.dsize)
3527                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3528
3529         if (tdb->max_dead_records != 0) {
3530                 /*
3531                  * Allow for some dead records per hash chain, look if we can
3532                  * find one that can hold the new record. We need enough space
3533                  * for key, data and tailer. If we find one, we don't have to
3534                  * consult the central freelist.
3535                  */
3536                 rec_ptr = tdb_find_dead(
3537                         tdb, hash, &rec,
3538                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3539
3540                 if (rec_ptr != 0) {
3541                         rec.key_len = key.dsize;
3542                         rec.data_len = dbuf.dsize;
3543                         rec.full_hash = hash;
3544                         rec.magic = TDB_MAGIC;
3545                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3546                             || tdb->methods->tdb_write(
3547                                     tdb, rec_ptr + sizeof(rec),
3548                                     p, key.dsize + dbuf.dsize) == -1) {
3549                                 goto fail;
3550                         }
3551                         goto done;
3552                 }
3553         }
3554
3555         /*
3556          * We have to allocate some space from the freelist, so this means we
3557          * have to lock it. Use the chance to purge all the DEAD records from
3558          * the hash chain under the freelist lock.
3559          */
3560
3561         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3562                 goto fail;
3563         }
3564
3565         if ((tdb->max_dead_records != 0)
3566             && (tdb_purge_dead(tdb, hash) == -1)) {
3567                 tdb_unlock(tdb, -1, F_WRLCK);
3568                 goto fail;
3569         }
3570
3571         /* we have to allocate some space */
3572         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3573
3574         tdb_unlock(tdb, -1, F_WRLCK);
3575
3576         if (rec_ptr == 0) {
3577                 goto fail;
3578         }
3579
3580         /* Read hash top into next ptr */
3581         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3582                 goto fail;
3583
3584         rec.key_len = key.dsize;
3585         rec.data_len = dbuf.dsize;
3586         rec.full_hash = hash;
3587         rec.magic = TDB_MAGIC;
3588
3589         /* write out and point the top of the hash chain at it */
3590         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3591             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3592             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3593                 /* Need to tdb_unallocate() here */
3594                 goto fail;
3595         }
3596
3597  done:
3598         ret = 0;
3599  fail:
3600         if (ret == 0) {
3601                 tdb_increment_seqnum(tdb);
3602         }
3603
3604         SAFE_FREE(p);
3605         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3606         return ret;
3607 }
3608
3609
3610 /* Append to an entry. Create if not exist. */
3611 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3612 {
3613         u32 hash;
3614         TDB_DATA dbuf;
3615         int ret = -1;
3616
3617         /* find which hash bucket it is in */
3618         hash = tdb->hash_fn(&key);
3619         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3620                 return -1;
3621
3622         dbuf = tdb_fetch(tdb, key);
3623
3624         if (dbuf.dptr == NULL) {
3625                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3626         } else {
3627                 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3628                                                      dbuf.dsize + new_dbuf.dsize);
3629                 if (new_dptr == NULL) {
3630                         free(dbuf.dptr);
3631                 }
3632                 dbuf.dptr = new_dptr;
3633         }
3634
3635         if (dbuf.dptr == NULL) {
3636                 tdb->ecode = TDB_ERR_OOM;
3637                 goto failed;
3638         }
3639
3640         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3641         dbuf.dsize += new_dbuf.dsize;
3642
3643         ret = tdb_store(tdb, key, dbuf, 0);
3644
3645 failed:
3646         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3647         SAFE_FREE(dbuf.dptr);
3648         return ret;
3649 }
3650
3651
3652 /*
3653   return the name of the current tdb file
3654   useful for external logging functions
3655 */
3656 const char *tdb_name(struct tdb_context *tdb)
3657 {
3658         return tdb->name;
3659 }
3660
3661 /*
3662   return the underlying file descriptor being used by tdb, or -1
3663   useful for external routines that want to check the device/inode
3664   of the fd
3665 */
3666 int tdb_fd(struct tdb_context *tdb)
3667 {
3668         return tdb->fd;
3669 }
3670
3671 /*
3672   return the current logging function
3673   useful for external tdb routines that wish to log tdb errors
3674 */
3675 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3676 {
3677         return tdb->log.log_fn;
3678 }
3679
3680
3681 /*
3682   get the tdb sequence number. Only makes sense if the writers opened
3683   with TDB_SEQNUM set. Note that this sequence number will wrap quite
3684   quickly, so it should only be used for a 'has something changed'
3685   test, not for code that relies on the count of the number of changes
3686   made. If you want a counter then use a tdb record.
3687
3688   The aim of this sequence number is to allow for a very lightweight
3689   test of a possible tdb change.
3690 */
3691 int tdb_get_seqnum(struct tdb_context *tdb)
3692 {
3693         tdb_off_t seqnum=0;
3694
3695         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3696         return seqnum;
3697 }
3698
3699 int tdb_hash_size(struct tdb_context *tdb)
3700 {
3701         return tdb->header.hash_size;
3702 }
3703
3704 size_t tdb_map_size(struct tdb_context *tdb)
3705 {
3706         return tdb->map_size;
3707 }
3708
3709 int tdb_get_flags(struct tdb_context *tdb)
3710 {
3711         return tdb->flags;
3712 }
3713
3714
3715 /*
3716   enable sequence number handling on an open tdb
3717 */
3718 void tdb_enable_seqnum(struct tdb_context *tdb)
3719 {
3720         tdb->flags |= TDB_SEQNUM;
3721 }
3722
3723 /* file: open.c */
3724
3725 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3726 static struct tdb_context *tdbs = NULL;
3727
3728
3729 /* This is from a hash algorithm suggested by Rogier Wolff */
3730 static unsigned int default_tdb_hash(TDB_DATA *key)
3731 {
3732         u32 value;      /* Used to compute the hash value.  */
3733         u32   i;        /* Used to cycle through random values. */
3734
3735         /* Set the initial value from the key size. */
3736         for (value = 0, i=0; i < key->dsize; i++)
3737                 value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3738
3739         return value;
3740 }
3741
3742
3743 /* initialise a new database with a specified hash size */
3744 static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3745 {
3746         struct tdb_header *newdb;
3747         int size, ret = -1;
3748
3749         /* We make it up in memory, then write it out if not internal */
3750         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3751         if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3752                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3753
3754         /* Fill in the header */
3755         newdb->version = TDB_VERSION;
3756         newdb->hash_size = hash_size;
3757         if (tdb->flags & TDB_INTERNAL) {
3758                 tdb->map_size = size;
3759                 tdb->map_ptr = (char *)newdb;
3760                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3761                 /* Convert the `ondisk' version if asked. */
3762                 CONVERT(*newdb);
3763                 return 0;
3764         }
3765         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3766                 goto fail;
3767
3768         if (ftruncate(tdb->fd, 0) == -1)
3769                 goto fail;
3770
3771         /* This creates an endian-converted header, as if read from disk */
3772         CONVERT(*newdb);
3773         memcpy(&tdb->header, newdb, sizeof(tdb->header));
3774         /* Don't endian-convert the magic food! */
3775         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3776         if (write(tdb->fd, newdb, size) != size) {
3777                 ret = -1;
3778         } else {
3779                 ret = 0;
3780         }
3781
3782   fail:
3783         SAFE_FREE(newdb);
3784         return ret;
3785 }
3786
3787
3788
3789 static int tdb_already_open(dev_t device,
3790                             ino_t ino)
3791 {
3792         struct tdb_context *i;
3793
3794         for (i = tdbs; i; i = i->next) {
3795                 if (i->device == device && i->inode == ino) {
3796                         return 1;
3797                 }
3798         }
3799
3800         return 0;
3801 }
3802
3803 /* open the database, creating it if necessary
3804
3805    The open_flags and mode are passed straight to the open call on the
3806    database file. A flags value of O_WRONLY is invalid. The hash size
3807    is advisory, use zero for a default value.
3808
3809    Return is NULL on error, in which case errno is also set.  Don't
3810    try to call tdb_error or tdb_errname, just do strerror(errno).
3811
3812    @param name may be NULL for internal databases. */
3813 struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3814                       int open_flags, mode_t mode)
3815 {
3816         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3817 }
3818
3819 /* a default logging function */
3820 static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3821 static void null_log_fn(struct tdb_context *tdb EXT2FS_ATTR((unused)),
3822                         enum tdb_debug_level level EXT2FS_ATTR((unused)),
3823                         const char *fmt EXT2FS_ATTR((unused)), ...)
3824 {
3825 }
3826
3827
3828 struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3829                                 int open_flags, mode_t mode,
3830                                 const struct tdb_logging_context *log_ctx,
3831                                 tdb_hash_func hash_fn)
3832 {
3833         struct tdb_context *tdb;
3834         struct stat st;
3835         int rev = 0, locked = 0;
3836         unsigned char *vp;
3837         u32 vertest;
3838
3839         if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3840                 /* Can't log this */
3841                 errno = ENOMEM;
3842                 goto fail;
3843         }
3844         tdb_io_init(tdb);
3845         tdb->fd = -1;
3846         tdb->name = NULL;
3847         tdb->map_ptr = NULL;
3848         tdb->flags = tdb_flags;
3849         tdb->open_flags = open_flags;
3850         if (log_ctx) {
3851                 tdb->log = *log_ctx;
3852         } else {
3853                 tdb->log.log_fn = null_log_fn;
3854                 tdb->log.log_private = NULL;
3855         }
3856         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3857
3858         /* cache the page size */
3859         tdb->page_size = sysconf(_SC_PAGESIZE);
3860         if (tdb->page_size <= 0) {
3861                 tdb->page_size = 0x2000;
3862         }
3863
3864         if ((open_flags & O_ACCMODE) == O_WRONLY) {
3865                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3866                          name));
3867                 errno = EINVAL;
3868                 goto fail;
3869         }
3870
3871         if (hash_size == 0)
3872                 hash_size = DEFAULT_HASH_SIZE;
3873         if ((open_flags & O_ACCMODE) == O_RDONLY) {
3874                 tdb->read_only = 1;
3875                 /* read only databases don't do locking or clear if first */
3876                 tdb->flags |= TDB_NOLOCK;
3877                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3878         }
3879
3880         /* internal databases don't mmap or lock, and start off cleared */
3881         if (tdb->flags & TDB_INTERNAL) {
3882                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3883                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3884                 if (tdb_new_database(tdb, hash_size) != 0) {
3885                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3886                         goto fail;
3887                 }
3888                 goto internal;
3889         }
3890
3891         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3892                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3893                          name, strerror(errno)));
3894                 goto fail;      /* errno set by open(2) */
3895         }
3896
3897         /* ensure there is only one process initialising at once */
3898         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3899                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3900                          name, strerror(errno)));
3901                 goto fail;      /* errno set by tdb_brlock */
3902         }
3903
3904         /* we need to zero database if we are the only one with it open */
3905         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3906             (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3907                 open_flags |= O_CREAT;
3908                 if (ftruncate(tdb->fd, 0) == -1) {
3909                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3910                                  "failed to truncate %s: %s\n",
3911                                  name, strerror(errno)));
3912                         goto fail; /* errno set by ftruncate */
3913                 }
3914         }
3915
3916         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3917             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3918             || (tdb->header.version != TDB_VERSION
3919                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3920                 /* its not a valid database - possibly initialise it */
3921                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3922                         errno = EIO; /* ie bad format or something */
3923                         goto fail;
3924                 }
3925                 rev = (tdb->flags & TDB_CONVERT);
3926         }
3927         vp = (unsigned char *)&tdb->header.version;
3928         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3929                   (((u32)vp[2]) << 8) | (u32)vp[3];
3930         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3931         if (!rev)
3932                 tdb->flags &= ~TDB_CONVERT;
3933         else {
3934                 tdb->flags |= TDB_CONVERT;
3935                 tdb_convert(&tdb->header, sizeof(tdb->header));
3936         }
3937         if (fstat(tdb->fd, &st) == -1)
3938                 goto fail;
3939
3940         if (tdb->header.rwlocks != 0) {
3941                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3942                 goto fail;
3943         }
3944
3945         /* Is it already in the open list?  If so, fail. */
3946         if (tdb_already_open(st.st_dev, st.st_ino)) {
3947                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3948                          "%s (%d,%d) is already open in this process\n",
3949                          name, (int)st.st_dev, (int)st.st_ino));
3950                 errno = EBUSY;
3951                 goto fail;
3952         }
3953
3954         if (!(tdb->name = (char *)strdup(name))) {
3955                 errno = ENOMEM;
3956                 goto fail;
3957         }
3958
3959         tdb->map_size = st.st_size;
3960         tdb->device = st.st_dev;
3961         tdb->inode = st.st_ino;
3962         tdb->max_dead_records = 0;
3963         tdb_mmap(tdb);
3964         if (locked) {
3965                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3966                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3967                                  "failed to take ACTIVE_LOCK on %s: %s\n",
3968                                  name, strerror(errno)));
3969                         goto fail;
3970                 }
3971
3972         }
3973
3974         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3975            we didn't get the initial exclusive lock as we need to let all other
3976            users know we're using it. */
3977
3978         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3979                 /* leave this lock in place to indicate it's in use */
3980                 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3981                         goto fail;
3982         }
3983
3984         /* if needed, run recovery */
3985         if (tdb_transaction_recover(tdb) == -1) {
3986                 goto fail;
3987         }
3988
3989  internal:
3990         /* Internal (memory-only) databases skip all the code above to
3991          * do with disk files, and resume here by releasing their
3992          * global lock and hooking into the active list. */
3993         if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3994                 goto fail;
3995         tdb->next = tdbs;
3996         tdbs = tdb;
3997         return tdb;
3998
3999  fail:
4000         { int save_errno = errno;
4001
4002         if (!tdb)
4003                 return NULL;
4004
4005         if (tdb->map_ptr) {
4006                 if (tdb->flags & TDB_INTERNAL)
4007                         SAFE_FREE(tdb->map_ptr);
4008                 else
4009                         tdb_munmap(tdb);
4010         }
4011         SAFE_FREE(tdb->name);
4012         if (tdb->fd != -1)
4013                 if (close(tdb->fd) != 0)
4014                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
4015         SAFE_FREE(tdb);
4016         errno = save_errno;
4017         return NULL;
4018         }
4019 }
4020
4021 /*
4022  * Set the maximum number of dead records per hash chain
4023  */
4024
4025 void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4026 {
4027         tdb->max_dead_records = max_dead;
4028 }
4029
4030 /**
4031  * Close a database.
4032  *
4033  * @returns -1 for error; 0 for success.
4034  **/
4035 int tdb_close(struct tdb_context *tdb)
4036 {
4037         struct tdb_context **i;
4038         int ret = 0;
4039
4040         if (tdb->transaction) {
4041                 tdb_transaction_cancel(tdb);
4042         }
4043
4044         if (tdb->map_ptr) {
4045                 if (tdb->flags & TDB_INTERNAL)
4046                         SAFE_FREE(tdb->map_ptr);
4047                 else
4048                         tdb_munmap(tdb);
4049         }
4050         SAFE_FREE(tdb->name);
4051         if (tdb->fd != -1)
4052                 ret = close(tdb->fd);
4053         SAFE_FREE(tdb->lockrecs);
4054
4055         /* Remove from contexts list */
4056         for (i = &tdbs; *i; i = &(*i)->next) {
4057                 if (*i == tdb) {
4058                         *i = tdb->next;
4059                         break;
4060                 }
4061         }
4062
4063         memset(tdb, 0, sizeof(*tdb));
4064         SAFE_FREE(tdb);
4065
4066         return ret;
4067 }
4068
4069 /* register a logging function */
4070 void tdb_set_logging_function(struct tdb_context *tdb,
4071                               const struct tdb_logging_context *log_ctx)
4072 {
4073         tdb->log = *log_ctx;
4074 }
4075
4076 void *tdb_get_logging_private(struct tdb_context *tdb)
4077 {
4078         return tdb->log.log_private;
4079 }
4080
4081 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
4082    seek pointer from our parent and to re-establish locks */
4083 int tdb_reopen(struct tdb_context *tdb)
4084 {
4085         struct stat st;
4086
4087         if (tdb->flags & TDB_INTERNAL) {
4088                 return 0; /* Nothing to do. */
4089         }
4090
4091         if (tdb->num_locks != 0 || tdb->global_lock.count) {
4092                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4093                 goto fail;
4094         }
4095
4096         if (tdb->transaction != 0) {
4097                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4098                 goto fail;
4099         }
4100
4101         if (tdb_munmap(tdb) != 0) {
4102                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4103                 goto fail;
4104         }
4105         if (close(tdb->fd) != 0)
4106                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4107         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4108         if (tdb->fd == -1) {
4109                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4110                 goto fail;
4111         }
4112         if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4113             (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4114                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4115                 goto fail;
4116         }
4117         if (fstat(tdb->fd, &st) != 0) {
4118                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4119                 goto fail;
4120         }
4121         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4122                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4123                 goto fail;
4124         }
4125         tdb_mmap(tdb);
4126
4127         return 0;
4128
4129 fail:
4130         tdb_close(tdb);
4131         return -1;
4132 }
4133
4134 /* reopen all tdb's */
4135 int tdb_reopen_all(int parent_longlived)
4136 {
4137         struct tdb_context *tdb;
4138
4139         for (tdb=tdbs; tdb; tdb = tdb->next) {
4140                 /*
4141                  * If the parent is longlived (ie. a
4142                  * parent daemon architecture), we know
4143                  * it will keep it's active lock on a
4144                  * tdb opened with CLEAR_IF_FIRST. Thus
4145                  * for child processes we don't have to
4146                  * add an active lock. This is essential
4147                  * to improve performance on systems that
4148                  * keep POSIX locks as a non-scalable data
4149                  * structure in the kernel.
4150                  */
4151                 if (parent_longlived) {
4152                         /* Ensure no clear-if-first. */
4153                         tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4154                 }
4155
4156                 if (tdb_reopen(tdb) != 0)
4157                         return -1;
4158         }
4159
4160         return 0;
4161 }
4162
4163 /**
4164  * Flush a database file from the page cache.
4165  **/
4166 int tdb_flush(struct tdb_context *tdb)
4167 {
4168         if (tdb->fd != -1)
4169                 return fsync(tdb->fd);
4170         return 0;
4171 }