Whamcloud - gitweb
minor fix for buffer offsets.
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging infrastructure.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/lvfs.h>
42 #include <linux/lustre_fsfilt.h>
43 #include <linux/lustre_log.h>
44
45 #ifdef __KERNEL__
46
47 static int llog_lvfs_pad(struct llog_ctxt *ctxt, struct l_file *file,
48                          int len, int index)
49 {
50         struct llog_rec_hdr rec;
51         struct llog_rec_tail tail;
52         int rc;
53         ENTRY;
54
55         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
56
57         tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
58         tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
59         rec.lrh_type = 0;
60
61         rc = llog_fsfilt_write_record(ctxt, file, &rec, sizeof(rec),
62                                       &file->f_pos, 0);
63         if (rc) {
64                 CERROR("error writing padding record: rc %d\n", rc);
65                 goto out;
66         }
67
68         file->f_pos += len - sizeof(rec) - sizeof(tail);
69         rc = llog_fsfilt_write_record(ctxt, file, &tail, sizeof(tail),
70                                       &file->f_pos, 0);
71         if (rc) {
72                 CERROR("error writing padding record: rc %d\n", rc);
73                 goto out;
74         }
75
76  out:
77         RETURN(rc);
78 }
79
80 static int llog_lvfs_write_blob(struct llog_ctxt *ctxt, struct l_file *file,
81                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
82 {
83         int rc;
84         struct llog_rec_tail end;
85         loff_t saved_off = file->f_pos;
86         int buflen = le32_to_cpu(rec->lrh_len);
87
88         ENTRY;
89         file->f_pos = off;
90
91         if (!buf) {
92                 rc = llog_fsfilt_write_record(ctxt, file, rec, buflen,
93                                               &file->f_pos, 0);
94                 if (rc) {
95                         CERROR("error writing log record: rc %d\n", rc);
96                         goto out;
97                 }
98                 GOTO(out, rc = 0);
99         }
100
101         /* the buf case */
102         rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
103         rc = llog_fsfilt_write_record(ctxt, file, rec, sizeof(*rec),
104                                       &file->f_pos, 0);
105         if (rc) {
106                 CERROR("error writing log hdr: rc %d\n", rc);
107                 goto out;
108         }
109
110         rc = llog_fsfilt_write_record(ctxt, file, buf, buflen,
111                                       &file->f_pos, 0);
112         if (rc) {
113                 CERROR("error writing log buffer: rc %d\n", rc);
114                 goto out;
115         }
116
117         end.lrt_len = rec->lrh_len;
118         end.lrt_index = rec->lrh_index;
119         rc = llog_fsfilt_write_record(ctxt, file, &end, sizeof(end),
120                                       &file->f_pos, 0);
121         if (rc) {
122                 CERROR("error writing log tail: rc %d\n", rc);
123                 goto out;
124         }
125
126         rc = 0;
127  out:
128         if (saved_off > file->f_pos)
129                 file->f_pos = saved_off;
130         LASSERT(rc <= 0);
131         RETURN(rc);
132 }
133
134 static int llog_lvfs_read_blob(struct llog_ctxt *ctxt, struct l_file *file,
135                                void *buf, int size, loff_t off)
136 {
137         loff_t offset = off;
138         int rc;
139         ENTRY;
140
141         rc = llog_fsfilt_read_record(ctxt, file, buf, size, &offset);
142         if (rc) {
143                 CERROR("error reading log record: rc %d\n", rc);
144                 RETURN(rc);
145         }
146         RETURN(0);
147 }
148
149 static int llog_lvfs_read_header(struct llog_handle *handle)
150 {
151         struct llog_ctxt *ctxt = handle->lgh_ctxt;
152         int rc;
153         ENTRY;
154
155         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
156         LASSERT(ctxt != NULL);
157
158         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
159                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
160                 RETURN(LLOG_EEMPTY);
161         }
162
163         rc = llog_lvfs_read_blob(ctxt, handle->lgh_file, handle->lgh_hdr,
164                                  LLOG_CHUNK_SIZE, 0);
165         if (rc)
166                 CERROR("error reading log header\n");
167
168         handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
169         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
170
171         RETURN(rc);
172 }
173
174 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
175 /* appends if idx == -1, otherwise overwrites record idx. */
176 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
177                                struct llog_rec_hdr *rec,
178                                struct llog_cookie *reccookie,
179                                int cookiecount,
180                                void *buf, int idx)
181 {
182         struct llog_log_hdr *llh;
183         int reclen = le32_to_cpu(rec->lrh_len), index, rc;
184         struct llog_rec_tail *lrt;
185         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
186         struct file *file;
187         loff_t offset;
188         size_t left;
189         ENTRY;
190
191         llh = loghandle->lgh_hdr;
192         file = loghandle->lgh_file;
193
194         /* record length should not bigger than LLOG_CHUNK_SIZE */
195         if (buf)
196                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
197                       - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
198         else
199                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
200         if (rc)
201                 RETURN(rc);
202
203         if (idx != -1) {
204                 loff_t saved_offset;
205
206                 /* no header: only allowed to insert record 1 */
207                 if (idx > 1 && !file->f_dentry->d_inode->i_size) {
208                         CERROR("idx != -1 in empty log\n");
209                         LBUG();
210                 }
211
212                 if (idx && llh->llh_size && llh->llh_size != reclen)
213                         RETURN(-EINVAL);
214
215                 rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
216                 /* we are done if we only write the header or on error */
217                 if (rc || idx == 0)
218                         RETURN(rc);
219
220                 saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
221                 rc = llog_lvfs_write_blob(ctxt, file, rec, buf, saved_offset);
222                 if (rc == 0 && reccookie) {
223                         reccookie->lgc_lgl = loghandle->lgh_id;
224                         reccookie->lgc_index = idx;
225                         rc = 1;
226                 }
227                 RETURN(rc);
228         }
229
230         /* Make sure that records don't cross a chunk boundary, so we can
231          * process them page-at-a-time if needed.  If it will cross a chunk
232          * boundary, write in a fake (but referenced) entry to pad the chunk.
233          *
234          * We know that llog_current_log() will return a loghandle that is
235          * big enough to hold reclen, so all we care about is padding here.
236          */
237         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
238         if (buf)
239                 reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
240                          sizeof(struct llog_rec_tail);
241
242         /* NOTE: padding is a record, but no bit is set */
243         if (left != 0 && left != reclen &&
244             left < (reclen + LLOG_MIN_REC_SIZE)) {
245                 loghandle->lgh_last_idx++;
246                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
247                 if (rc)
248                         RETURN(rc);
249                 /* if it's the last idx in log file, then return -ENOSPC */
250                 if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
251                         RETURN(-ENOSPC);
252         }
253
254         loghandle->lgh_last_idx++;
255         index = loghandle->lgh_last_idx;
256         rec->lrh_index = cpu_to_le32(index);
257         if (buf == NULL) {
258                 lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
259                 lrt->lrt_len = rec->lrh_len;
260                 lrt->lrt_index = rec->lrh_index;
261         }
262         if (ext2_set_bit(index, llh->llh_bitmap)) {
263                 CERROR("argh, index %u already set in log bitmap?\n", index);
264                 LBUG(); /* should never happen */
265         }
266         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
267         llh->llh_tail.lrt_index = cpu_to_le32(index);
268
269         offset = 0;
270         rc = llog_lvfs_write_blob(ctxt, file, &llh->llh_hdr, NULL, 0);
271         if (rc)
272                 RETURN(rc);
273
274         CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
275                loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
276                file->f_pos);
277
278         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
279         if (rc)
280                 RETURN(rc);
281
282         if (rc == 0 && reccookie) {
283                 if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
284                         LASSERTF(EQ_LOGID(reccookie->lgc_lgl,loghandle->lgh_id),
285                                  "lgc_lgl.oid/gr "LPU64"/"LPU64" lgh_id.oid/gr"
286                                  LPU64"/"LPU64"\n",
287                                  reccookie->lgc_lgl.lgl_oid,
288                                  reccookie->lgc_lgl.lgl_ogr,
289                                  loghandle->lgh_id.lgl_oid,
290                                  loghandle->lgh_id.lgl_oid);
291                         LASSERTF(reccookie->lgc_index == index,
292                                  "lgc_index %u != index %u\n",
293                                  reccookie->lgc_index, index);
294                 } else {
295                         reccookie->lgc_lgl = loghandle->lgh_id;
296                         reccookie->lgc_index = index;
297                         llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
298                 }
299
300                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
301                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
302                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
303                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
304                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
305                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
306                 else
307                         reccookie->lgc_subsys = -1;
308                 rc = 1;
309         }
310         if (rc == 0 && (le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC ||
311             le32_to_cpu(rec->lrh_type) == SMFS_UPDATE_REC))
312                 rc = 1;
313
314         RETURN(rc);
315 }
316
317 /* We can skip reading at least as many log blocks as the number of
318 * minimum sized log records we are skipping.  If it turns out
319 * that we are not far enough along the log (because the
320 * actual records are larger than minimum size) we just skip
321 * some more records. */
322
323 static void llog_skip_over(__u64 *off, int curr, int goal)
324 {
325         if (goal <= curr)
326                 return;
327         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
328                 ~(LLOG_CHUNK_SIZE - 1);
329 }
330
331 /* sets:
332  *  - curr_offset to the furthest point read in the log file
333  *  - curr_idx to the log index preceeding curr_offset
334  * returns -EIO/-EINVAL on error
335  */
336 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
337                                 int next_idx, __u64 *curr_offset, void *buf,
338                                 int len)
339 {
340         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
341         ENTRY;
342
343         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
344                 RETURN(-EINVAL);
345
346         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
347                next_idx, *curr_idx, *curr_offset);
348
349         while (*curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
350                 struct llog_rec_hdr *rec;
351                 struct llog_rec_tail *tail;
352                 loff_t ppos;
353                 int nbytes, rc;
354
355                 llog_skip_over(curr_offset, *curr_idx, next_idx);
356
357                 ppos = *curr_offset;
358                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
359                                              buf, len, &ppos);
360
361                 if (rc) {
362                         CERROR("Cant read llog block at log id "LPU64
363                                "/%u offset "LPU64"\n",
364                                loghandle->lgh_id.lgl_oid,
365                                loghandle->lgh_id.lgl_ogen,
366                                *curr_offset);
367                         RETURN(rc);
368                 }
369
370                 nbytes = ppos - *curr_offset;
371                 *curr_offset = ppos;
372
373                 if (nbytes == 0) /* end of file, nothing to do */
374                         RETURN(0);
375
376                 if (nbytes < sizeof(*tail)) {
377                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
378                                LPU64"\n", loghandle->lgh_id.lgl_oid,
379                                loghandle->lgh_id.lgl_ogen, *curr_offset);
380                         RETURN(-EINVAL);
381                 }
382
383                 tail = buf + nbytes - sizeof(struct llog_rec_tail);
384                 *curr_idx = le32_to_cpu(tail->lrt_index);
385
386                 /* this shouldn't happen */
387                 if (tail->lrt_index == 0) {
388                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
389                                LPU64"\n", loghandle->lgh_id.lgl_oid,
390                                loghandle->lgh_id.lgl_ogen, *curr_offset);
391                         RETURN(-EINVAL);
392                 }
393                 if (le32_to_cpu(tail->lrt_index) < next_idx)
394                         continue;
395
396                 /* sanity check that the start of the new buffer is no farther
397                  * than the record that we wanted.  This shouldn't happen. */
398                 rec = buf;
399                 if (le32_to_cpu(rec->lrh_index) > next_idx) {
400                         CERROR("missed desired record? %u > %u\n",
401                                le32_to_cpu(rec->lrh_index), next_idx);
402                         RETURN(-ENOENT);
403                 }
404                 RETURN(0);
405         }
406         RETURN(-EIO);
407 }
408
409 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
410                                 int prev_idx, void *buf, int len)
411 {
412         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
413         __u64 curr_offset;
414         int rc;
415         ENTRY;
416
417         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
418                 RETURN(-EINVAL);
419
420         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
421
422         curr_offset = LLOG_CHUNK_SIZE;
423         llog_skip_over(&curr_offset, 0, prev_idx);
424
425         while (curr_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
426                 struct llog_rec_hdr *rec;
427                 struct llog_rec_tail *tail;
428                 loff_t ppos;
429
430                 ppos = curr_offset;
431                 rc = llog_fsfilt_read_record(ctxt, loghandle->lgh_file,
432                                              buf, len, &ppos);
433
434                 if (rc) {
435                         CERROR("Cant read llog block at log id "LPU64
436                                "/%u offset "LPU64"\n",
437                                loghandle->lgh_id.lgl_oid,
438                                loghandle->lgh_id.lgl_ogen,
439                                curr_offset);
440                         RETURN(rc);
441                 }
442
443                 /* put number of bytes read into rc to make code simpler */
444                 rc = ppos - curr_offset;
445                 curr_offset = ppos;
446
447                 if (rc == 0) /* end of file, nothing to do */
448                         RETURN(0);
449
450                 if (rc < sizeof(*tail)) {
451                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
452                                LPU64"\n", loghandle->lgh_id.lgl_oid,
453                                loghandle->lgh_id.lgl_ogen, curr_offset);
454                         RETURN(-EINVAL);
455                 }
456
457                 tail = buf + rc - sizeof(struct llog_rec_tail);
458
459                 /* this shouldn't happen */
460                 if (tail->lrt_index == 0) {
461                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
462                                LPU64"\n", loghandle->lgh_id.lgl_oid,
463                                loghandle->lgh_id.lgl_ogen, curr_offset);
464                         RETURN(-EINVAL);
465                 }
466                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
467                         continue;
468
469                 /* sanity check that the start of the new buffer is no farther
470                  * than the record that we wanted.  This shouldn't happen. */
471                 rec = buf;
472                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
473                         CERROR("missed desired record? %u > %u\n",
474                                le32_to_cpu(rec->lrh_index), prev_idx);
475                         RETURN(-ENOENT);
476                 }
477                 RETURN(0);
478         }
479         RETURN(-EIO);
480 }
481
482 static struct file *llog_filp_open(char *name, int flags, int mode)
483 {
484         char *logname;
485         struct file *filp;
486         int len;
487
488         OBD_ALLOC(logname, PATH_MAX);
489         if (logname == NULL)
490                 return ERR_PTR(-ENOMEM);
491
492         len = snprintf(logname, PATH_MAX, "LOGS/%s", name);
493         if (len >= PATH_MAX - 1) {
494                 filp = ERR_PTR(-ENAMETOOLONG);
495         } else {
496                 filp = l_filp_open(logname, flags, mode);
497                 if (IS_ERR(filp)) {
498                         CERROR("logfile %s(%s): %ld\n",
499                                flags & O_CREAT ? "create" : "open", logname,
500                                PTR_ERR(filp));
501                 }
502         }
503
504         OBD_FREE(logname, PATH_MAX);
505         return filp;
506 }
507
508 /* creates object for the case when we have no obd (smfs). */
509 static struct file *
510 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
511 {
512         struct file *filp;
513         int rc = 0;
514         ENTRY;
515
516         LASSERT(lgh_id != NULL);
517         if (lgh_id->lgl_oid) {
518                 struct dentry *dchild;
519                 char fidname[LL_FID_NAMELEN];
520                 int fidlen = 0;
521
522                 down(&ctxt->loc_objects_dir->d_inode->i_sem);
523                 fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
524                 dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
525                 if (IS_ERR(dchild)) {
526                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
527                         RETURN((struct file *)dchild);
528                 }
529                 if (dchild->d_inode == NULL) {
530                         struct dentry_params dp;
531                         struct inode *inode;
532
533                         dchild->d_fsdata = (void *) &dp;
534                         dp.p_ptr = NULL;
535                         dp.p_inum = lgh_id->lgl_oid;
536                         rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
537                                            dchild, S_IFREG, NULL);
538                         if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
539                                 dchild->d_fsdata = NULL;
540                         if (rc) {
541                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
542                                 dput(dchild);
543                                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
544                                 RETURN(ERR_PTR(rc));
545                         }
546                         inode = dchild->d_inode;
547                         LASSERT(inode->i_ino == lgh_id->lgl_oid);
548                         inode->i_generation = lgh_id->lgl_ogen;
549                         CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
550                                inode->i_ino, inode->i_generation);
551                         mark_inode_dirty(inode);
552                 }
553
554                 mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
555                 filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
556                                     O_RDWR | O_LARGEFILE);
557                 if (IS_ERR(filp)) {
558                         dput(dchild);
559                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
560                         RETURN(filp);
561                 }
562                 if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
563                         CERROR("%s is not a regular file!: mode = %o\n", fidname,
564                                filp->f_dentry->d_inode->i_mode);
565                         filp_close(filp, 0);
566                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
567                         RETURN(ERR_PTR(-ENOENT));
568                 }
569
570                 up(&ctxt->loc_objects_dir->d_inode->i_sem);
571                 RETURN(filp);
572
573         } else {
574                 unsigned int tmpname = ll_insecure_random_int();
575                 char fidname[LL_FID_NAMELEN];
576                 struct dentry *new_child, *parent;
577                 void *handle;
578                 int err, namelen;
579
580                 sprintf(fidname, "OBJECTS/%u", tmpname);
581                 filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
582                 if (IS_ERR(filp)) {
583                         rc = PTR_ERR(filp);
584                         if (rc == -EEXIST) {
585                                 CERROR("impossible object name collision %u\n",
586                                         tmpname);
587                                 LBUG();
588                         }
589                         CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
590                         RETURN(filp);
591                 }
592
593                 namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
594                                      filp->f_dentry->d_inode->i_generation);
595                 parent = filp->f_dentry->d_parent;
596                 down(&parent->d_inode->i_sem);
597                 new_child = lookup_one_len(fidname, parent, namelen);
598                 if (IS_ERR(new_child)) {
599                         CERROR("getting neg dentry for obj rename: %d\n", rc);
600                         GOTO(out_close, rc = PTR_ERR(new_child));
601                 }
602                 if (new_child->d_inode != NULL) {
603                         CERROR("impossible non-negative obj dentry %lu:%u!\n",
604                                 filp->f_dentry->d_inode->i_ino,
605                                 filp->f_dentry->d_inode->i_generation);
606                         LBUG();
607                 }
608
609                 handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
610                 if (IS_ERR(handle))
611                         GOTO(out_dput, rc = PTR_ERR(handle));
612
613                 lock_kernel();
614                 rc = vfs_rename(parent->d_inode, filp->f_dentry,
615                                 parent->d_inode, new_child);
616                 unlock_kernel();
617                 if (rc)
618                         CERROR("error renaming new object %lu:%u: rc %d\n",
619                                 filp->f_dentry->d_inode->i_ino,
620                                 filp->f_dentry->d_inode->i_generation, rc);
621
622                 err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
623                 if (!rc)
624                         rc = err;
625
626         out_dput:
627                 dput(new_child);
628         out_close:
629                 up(&parent->d_inode->i_sem);
630                 if (rc) {
631                         filp_close(filp, 0);
632                         filp = ERR_PTR(rc);
633                 } else {
634                         /* FIXME: is this group 1 is correct? */
635                         lgh_id->lgl_ogr = 1;
636                         lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
637                         lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
638                 }
639                 RETURN(filp);
640         }
641 }
642
643 /* creates object for generic case (obd exists) */
644 static struct file *
645 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
646 {
647         struct file *filp = NULL;
648         struct dentry *dchild;
649         struct obd_device *obd;
650         struct obdo *oa = NULL;
651         int open_flags = O_RDWR | O_LARGEFILE;
652         int rc = 0;
653         ENTRY;
654
655         obd = ctxt->loc_exp->exp_obd;
656         LASSERT(obd != NULL);
657
658         if (lgh_id->lgl_oid) {
659                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
660                                              lgh_id->lgl_ogen, lgh_id->lgl_ogr);
661                 if (IS_ERR(dchild) == -ENOENT) {
662                         OBD_ALLOC(oa, sizeof(*oa));
663                         if (!oa)
664                                 RETURN(ERR_PTR(-ENOMEM));
665
666                         oa->o_id = lgh_id->lgl_oid;
667                         oa->o_generation = lgh_id->lgl_ogen;
668                         oa->o_gr = lgh_id->lgl_ogr;
669                         oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
670                         rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
671                         if (rc) {
672                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
673                                 GOTO(out_free_oa, rc);
674                         }
675                         CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
676                                lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
677
678                         dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
679                                                      lgh_id->lgl_ogen, lgh_id->lgl_ogr);
680                 } else if (IS_ERR(dchild)) {
681                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
682                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
683                         RETURN((struct file *)dchild);
684                 }
685
686                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
687                 if (IS_ERR(filp)) {
688                         l_dput(dchild);
689                         rc = PTR_ERR(filp);
690                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
691                                lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
692                 }
693                 GOTO(out_free_oa, rc);
694         } else {
695                 /* this is important to work here over obd_create() as it manages 
696                   groups and we need it. Yet another reason is that mds_obd_create()
697                  is fully the same as old version of this function and this helps
698                  us to avoid code duplicating and layering violating. */
699                 OBD_ALLOC(oa, sizeof(*oa));
700                 if (!oa)
701                         RETURN(ERR_PTR(-ENOMEM));
702
703                 oa->o_gr = FILTER_GROUP_LLOG;
704                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
705                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
706                 if (rc)
707                         GOTO(out_free_oa, rc);
708
709                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
710                                              oa->o_generation, oa->o_gr);
711                 if (IS_ERR(dchild))
712                         GOTO(out_free_oa, rc = PTR_ERR(dchild));
713
714                 filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
715                                      open_flags);
716                 if (IS_ERR(filp)) {
717                         l_dput(dchild);
718                         GOTO(out_free_oa, rc = PTR_ERR(filp));
719                 }
720
721                 /* group 1 is not longer valid, we use the group which is set 
722                 by obd_create()->mds_obd_create(). */
723                 lgh_id->lgl_ogr = oa->o_gr;
724                 lgh_id->lgl_oid = oa->o_id;
725                 lgh_id->lgl_ogen = oa->o_generation;
726         }
727
728 out_free_oa:
729         if (rc)
730                 filp = ERR_PTR(rc);
731         if (oa)
732                 OBD_FREE(oa, sizeof(*oa));
733         RETURN(filp);
734 }
735
736 static struct file *
737 llog_object_create(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
738 {
739         if (ctxt->loc_alone)
740                 return llog_object_create_alone(ctxt, lgh_id);
741         else
742                 return llog_object_create_generic(ctxt, lgh_id);
743 }
744
745 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
746                                 struct dentry *dentry)
747 {
748         struct dentry *new_child;
749         char fidname[LL_FID_NAMELEN];
750         void *handle;
751         int namelen, rc = 0, err;
752         ENTRY;
753         
754         namelen = ll_fid2str(fidname, logid.lgl_oid, logid.lgl_ogen);
755         down(&ctxt->loc_objects_dir->d_inode->i_sem);
756         new_child = lookup_one_len(fidname, ctxt->loc_objects_dir, namelen);
757         if (IS_ERR(new_child)) {
758                 CERROR("getting neg dentry for obj rename: %d\n", rc);
759                 GOTO(out, rc = PTR_ERR(new_child));
760         }
761         if (new_child->d_inode == dentry->d_inode)
762                 GOTO(out_dput, rc);
763         if (new_child->d_inode != NULL) {
764                 CERROR("impossible non-negative obj dentry "LPX64":%u!\n",
765                        logid.lgl_oid, logid.lgl_ogen);
766                 LBUG();
767         }
768         handle = llog_fsfilt_start(ctxt, ctxt->loc_objects_dir->d_inode,
769                                    FSFILT_OP_LINK, NULL);
770         if (IS_ERR(handle))
771                 GOTO(out_dput, rc = PTR_ERR(handle));
772         
773         lock_kernel();
774         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
775         unlock_kernel();
776         if (rc)
777                 CERROR("error link new object "LPX64":%u: rc %d\n",
778                        logid.lgl_oid, logid.lgl_ogen, rc);
779         err = llog_fsfilt_commit(ctxt, ctxt->loc_objects_dir->d_inode, handle, 0);
780 out_dput:
781         l_dput(new_child);
782 out:
783         up(&ctxt->loc_objects_dir->d_inode->i_sem);
784         RETURN(rc);
785 }
786
787 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
788                           struct llog_logid *logid, char *name, int flags)
789 {
790         struct llog_handle *handle;
791         struct lvfs_run_ctxt saved;
792         int rc = 0;
793         int open_flags = O_RDWR | O_LARGEFILE;
794         ENTRY;
795
796         if (flags & OBD_LLOG_FL_CREATE)
797                 open_flags |= O_CREAT;
798
799         handle = llog_alloc_handle();
800         if (handle == NULL)
801                 RETURN(-ENOMEM);
802         *res = handle;
803         
804         LASSERT(ctxt);
805         if (ctxt->loc_lvfs_ctxt)
806                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
807         
808         if (logid != NULL) {
809                 handle->lgh_file = llog_object_create(ctxt, logid);
810                 if (IS_ERR(handle->lgh_file)) {
811                         CERROR("cannot create/open llog object "LPX64":%x "
812                                "error = %ld", logid->lgl_oid, logid->lgl_ogen,
813                                PTR_ERR(handle->lgh_file));
814                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
815                 }
816                 handle->lgh_id = *logid;
817
818         } else if (name) {
819                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
820                 if (IS_ERR(handle->lgh_file)) {
821                         CERROR("cannot open %s file, error = %ld\n", 
822                                name, PTR_ERR(handle->lgh_file));
823                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
824                 }
825                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
826                 
827                 handle->lgh_id.lgl_ogr = 1;
828                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
829                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
830                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
831                 if (rc)
832                         GOTO(cleanup, rc);
833
834         } else {
835                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
836                 if (IS_ERR(handle->lgh_file)) {
837                         CERROR("cannot create llog object, error = %ld\n", 
838                                PTR_ERR(handle->lgh_file));
839                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
840                 }
841         }
842
843         handle->lgh_ctxt = ctxt;
844 finish:
845         if (ctxt->loc_lvfs_ctxt)
846                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
847         RETURN(rc);
848 cleanup:
849         llog_free_handle(handle);
850         goto finish;
851 }
852
853 static int llog_lvfs_close(struct llog_handle *handle)
854 {
855         int rc;
856         ENTRY;
857
858         rc = filp_close(handle->lgh_file, 0);
859         if (rc)
860                 CERROR("error closing log: rc %d\n", rc);
861         RETURN(rc);
862 }
863
864 static int llog_lvfs_destroy(struct llog_handle *loghandle)
865 {
866         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
867         struct lvfs_run_ctxt saved;
868         struct dentry *fdentry;
869         struct inode *parent_inode;
870         char fidname[LL_FID_NAMELEN];
871         void *handle;
872         int rc = -EINVAL, err, namelen;
873         ENTRY;
874         
875         if (ctxt->loc_lvfs_ctxt)
876                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
877         
878         fdentry = loghandle->lgh_file->f_dentry;
879         parent_inode = fdentry->d_parent->d_inode;
880         
881         if (!strcmp(fdentry->d_parent->d_name.name, "LOGS")) {
882                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
883                 
884                 namelen = ll_fid2str(fidname, fdentry->d_inode->i_ino,
885                                      fdentry->d_inode->i_generation);
886                 dget(fdentry);
887                 rc = llog_lvfs_close(loghandle);
888                 if (rc) {
889                         dput(fdentry);
890                         GOTO(out, rc);
891                 }
892                 
893                 handle = llog_fsfilt_start(ctxt, parent_inode,
894                                            FSFILT_OP_UNLINK, NULL);
895                 if (IS_ERR(handle)) {
896                         dput(fdentry);
897                         GOTO(out, rc = PTR_ERR(handle));
898                 }
899                 
900                 down(&parent_inode->i_sem);
901                 rc = vfs_unlink(parent_inode, fdentry);
902                 up(&parent_inode->i_sem);
903                 dput(fdentry);
904                 
905                 if (!rc) {
906                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
907                         fdentry = lookup_one_len(fidname, ctxt->loc_objects_dir,
908                                                  namelen);
909                         if (fdentry == NULL || fdentry->d_inode == NULL) {
910                                 CERROR("destroy non_existent object %s\n", fidname);
911                                 GOTO(out_err, rc = IS_ERR(fdentry) ?
912                                      PTR_ERR(fdentry) : -ENOENT);
913                         }
914                         rc = vfs_unlink(ctxt->loc_objects_dir->d_inode, fdentry);
915                         l_dput(fdentry);
916 out_err:
917                         up(&ctxt->loc_objects_dir->d_inode->i_sem);
918                 }
919                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
920                 if (err && !rc)
921                         err = rc;
922                 
923                 GOTO(out, rc);
924         }
925         if (ctxt->loc_alone) {
926                 if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
927                         LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
928                         
929                         dget(fdentry);
930                         rc = llog_lvfs_close(loghandle);
931                         if (rc == 0) {
932                                 down(&parent_inode->i_sem);
933                                 rc = vfs_unlink(parent_inode, fdentry);
934                                 up(&parent_inode->i_sem);
935                         }
936                         dput(fdentry);
937                 }
938         } else {
939                 struct obdo *oa = NULL;
940  
941                 OBD_ALLOC(oa, sizeof(*oa));
942                 if (!oa)
943                         GOTO(out, rc = -ENOMEM);
944                 
945                 oa->o_id = loghandle->lgh_id.lgl_oid;
946                 oa->o_gr = loghandle->lgh_id.lgl_ogr;
947                 oa->o_generation = loghandle->lgh_id.lgl_ogen;
948                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
949                 
950                 rc = llog_lvfs_close(loghandle);
951                 if (rc)
952                         GOTO(out_free_oa, rc);
953                 
954                 rc = obd_destroy(loghandle->lgh_ctxt->loc_exp, oa, NULL, NULL);
955 out_free_oa:
956                 OBD_FREE(oa, sizeof(*oa));
957         }
958 out:
959         if (ctxt->loc_lvfs_ctxt)
960                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
961         RETURN(rc);
962 }
963
964 /* reads the catalog list */
965 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
966                       struct fsfilt_operations *fsops, char *name,
967                       int count, struct llog_catid *idarray)
968 {
969         struct lvfs_run_ctxt saved;
970         struct l_file *file;
971         int size = sizeof(*idarray) * count;
972         loff_t off = 0;
973         int rc;
974
975         LASSERT(count);
976
977         if (ctxt)
978                 push_ctxt(&saved, ctxt, NULL);
979         file = l_filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
980         if (!file || IS_ERR(file)) {
981                 rc = PTR_ERR(file);
982                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
983                        name, rc);
984                 GOTO(out, rc);
985         }
986
987         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
988                 CERROR("%s is not a regular file!: mode = %o\n", name,
989                        file->f_dentry->d_inode->i_mode);
990                 GOTO(out, rc = -ENOENT);
991         }
992
993         rc = fsops->fs_read_record(file, idarray, size, &off);
994         if (rc) {
995                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
996                        name, rc);
997                 GOTO(out, rc);
998         }
999
1000  out:
1001         if (file && !IS_ERR(file))
1002                 rc = filp_close(file, 0);
1003         if (ctxt)
1004                 pop_ctxt(&saved, ctxt, NULL);
1005         RETURN(rc);
1006 }
1007 EXPORT_SYMBOL(llog_get_cat_list);
1008
1009 /* writes the cat list */
1010 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1011                       struct fsfilt_operations *fsops, char *name,
1012                       int count, struct llog_catid *idarray)
1013 {
1014         struct lvfs_run_ctxt saved;
1015         struct l_file *file;
1016         int size = sizeof(*idarray) * count;
1017         loff_t off = 0;
1018         int rc;
1019
1020         LASSERT(count);
1021
1022         if (ctxt)
1023                 push_ctxt(&saved, ctxt, NULL);
1024         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
1025         if (!file || IS_ERR(file)) {
1026                 rc = PTR_ERR(file);
1027                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
1028                        name, rc);
1029                 GOTO(out, rc);
1030         }
1031
1032         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
1033                 CERROR("%s is not a regular file!: mode = %o\n", name,
1034                        file->f_dentry->d_inode->i_mode);
1035                 GOTO(out, rc = -ENOENT);
1036         }
1037
1038         rc = fsops->fs_write_record(file, idarray, size, &off, 1);
1039         if (rc) {
1040                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
1041                        name, rc);
1042                 GOTO(out, rc);
1043         }
1044
1045  out:
1046         if (file && !IS_ERR(file))
1047                 rc = filp_close(file, 0);
1048         if (ctxt)
1049                 pop_ctxt(&saved, ctxt, NULL);
1050         RETURN(rc);
1051 }
1052 EXPORT_SYMBOL(llog_put_cat_list);
1053
1054 struct llog_operations llog_lvfs_ops = {
1055         lop_open:        llog_lvfs_open,
1056         lop_destroy:     llog_lvfs_destroy,
1057         lop_close:       llog_lvfs_close,
1058         lop_read_header: llog_lvfs_read_header,
1059         lop_write_rec:   llog_lvfs_write_rec,
1060         lop_next_block:  llog_lvfs_next_block,
1061         lop_prev_block:  llog_lvfs_prev_block,
1062 };
1063 EXPORT_SYMBOL(llog_lvfs_ops);
1064
1065 #else /* !__KERNEL__ */
1066
1067 static int llog_lvfs_read_header(struct llog_handle *handle)
1068 {
1069         LBUG();
1070         return 0;
1071 }
1072
1073 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
1074                                struct llog_rec_hdr *rec,
1075                                struct llog_cookie *reccookie, int cookiecount,
1076                                void *buf, int idx)
1077 {
1078         LBUG();
1079         return 0;
1080 }
1081
1082 static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
1083                           struct llog_logid *logid, char *name, int flags)
1084 {
1085         LBUG();
1086         return 0;
1087 }
1088
1089 static int llog_lvfs_close(struct llog_handle *handle)
1090 {
1091         LBUG();
1092         return 0;
1093 }
1094
1095 static int llog_lvfs_destroy(struct llog_handle *handle)
1096 {
1097         LBUG();
1098         return 0;
1099 }
1100
1101 int llog_get_cat_list(struct lvfs_run_ctxt *ctxt,
1102                       struct fsfilt_operations *fsops, char *name,
1103                       int count, struct llog_catid *idarray)
1104 {
1105         LBUG();
1106         return 0;
1107 }
1108
1109 int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
1110                       struct fsfilt_operations *fsops, char *name,
1111                       int count, struct llog_catid *idarray)
1112 {
1113         LBUG();
1114         return 0;
1115 }
1116
1117 int llog_lvfs_prev_block(struct llog_handle *loghandle,
1118                          int prev_idx, void *buf, int len)
1119 {
1120         LBUG();
1121         return 0;
1122 }
1123
1124 int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
1125                          int next_idx, __u64 *offset, void *buf, int len)
1126 {
1127         LBUG();
1128         return 0;
1129 }
1130
1131 struct llog_operations llog_lvfs_ops = {
1132         lop_open:        llog_lvfs_open,
1133         lop_destroy:     llog_lvfs_destroy,
1134         lop_close:       llog_lvfs_close,
1135         lop_read_header: llog_lvfs_read_header,
1136         lop_write_rec:   llog_lvfs_write_rec,
1137         lop_next_block:  llog_lvfs_next_block,
1138         lop_prev_block:  llog_lvfs_prev_block,
1139 };
1140 #endif