Whamcloud - gitweb
LU-1302 llog: pass lu_env to the llog callback
[fs/lustre-release.git] / lustre / obdclass / llog_lvfs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/obdclass/llog_lvfs.c
35  *
36  * OST<->MDS recovery logging infrastructure.
37  * Invariants in implementation:
38  * - we do not share logs among different OST<->MDS connections, so that
39  *   if an OST or MDS fails it need only look at log(s) relevant to itself
40  *
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_log.h>
53 #include <obd_ost.h>
54 #include <libcfs/list.h>
55 #include <lvfs.h>
56 #include <lustre_fsfilt.h>
57 #include <lustre_disk.h>
58 #include "llog_internal.h"
59
60 #if defined(__KERNEL__) && defined(LLOG_LVFS)
61
62 static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
63                                 int len, int index)
64 {
65         struct llog_rec_hdr rec = { 0 };
66         struct llog_rec_tail tail;
67         int rc;
68         ENTRY;
69
70         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
71
72         tail.lrt_len = rec.lrh_len = len;
73         tail.lrt_index = rec.lrh_index = index;
74         rec.lrh_type = LLOG_PAD_MAGIC;
75
76         rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
77         if (rc) {
78                 CERROR("error writing padding record: rc %d\n", rc);
79                 goto out;
80         }
81
82         file->f_pos += len - sizeof(rec) - sizeof(tail);
83         rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
84         if (rc) {
85                 CERROR("error writing padding record: rc %d\n", rc);
86                 goto out;
87         }
88
89  out:
90         RETURN(rc);
91 }
92
93 static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
94                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
95 {
96         int rc;
97         struct llog_rec_tail end;
98         loff_t saved_off = file->f_pos;
99         int buflen = rec->lrh_len;
100
101         ENTRY;
102
103         file->f_pos = off;
104
105         if (buflen == 0)
106                 CWARN("0-length record\n");
107
108         if (!buf) {
109                 rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
110                 if (rc) {
111                         CERROR("error writing log record: rc %d\n", rc);
112                         goto out;
113                 }
114                 GOTO(out, rc = 0);
115         }
116
117         /* the buf case */
118         rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
119         rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
120         if (rc) {
121                 CERROR("error writing log hdr: rc %d\n", rc);
122                 goto out;
123         }
124
125         rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
126         if (rc) {
127                 CERROR("error writing log buffer: rc %d\n", rc);
128                 goto out;
129         }
130
131         end.lrt_len = rec->lrh_len;
132         end.lrt_index = rec->lrh_index;
133         rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
134         if (rc) {
135                 CERROR("error writing log tail: rc %d\n", rc);
136                 goto out;
137         }
138
139         rc = 0;
140  out:
141         if (saved_off > file->f_pos)
142                 file->f_pos = saved_off;
143         LASSERT(rc <= 0);
144         RETURN(rc);
145 }
146
147 static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
148                                 void *buf, int size, loff_t off)
149 {
150         loff_t offset = off;
151         int rc;
152         ENTRY;
153
154         rc = fsfilt_read_record(obd, file, buf, size, &offset);
155         if (rc) {
156                 CERROR("error reading log record: rc %d\n", rc);
157                 RETURN(rc);
158         }
159         RETURN(0);
160 }
161
162 static int llog_lvfs_read_header(struct llog_handle *handle)
163 {
164         struct obd_device *obd;
165         int rc;
166         ENTRY;
167
168         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
169
170         obd = handle->lgh_ctxt->loc_exp->exp_obd;
171
172         if (i_size_read(handle->lgh_file->f_dentry->d_inode) == 0) {
173                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
174                 RETURN(LLOG_EEMPTY);
175         }
176
177         rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
178                                  LLOG_CHUNK_SIZE, 0);
179         if (rc) {
180                 CERROR("error reading log header from %.*s\n",
181                        handle->lgh_file->f_dentry->d_name.len,
182                        handle->lgh_file->f_dentry->d_name.name);
183         } else {
184                 struct llog_rec_hdr *llh_hdr = &handle->lgh_hdr->llh_hdr;
185
186                 if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
187                         lustre_swab_llog_hdr(handle->lgh_hdr);
188
189                 if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
190                         CERROR("bad log %.*s header magic: %#x (expected %#x)\n",
191                                handle->lgh_file->f_dentry->d_name.len,
192                                handle->lgh_file->f_dentry->d_name.name,
193                                llh_hdr->lrh_type, LLOG_HDR_MAGIC);
194                         rc = -EIO;
195                 } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
196                         CERROR("incorrectly sized log %.*s header: %#x "
197                                "(expected %#x)\n",
198                                handle->lgh_file->f_dentry->d_name.len,
199                                handle->lgh_file->f_dentry->d_name.name,
200                                llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
201                         CERROR("you may need to re-run lconf --write_conf.\n");
202                         rc = -EIO;
203                 }
204         }
205
206         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
207         handle->lgh_file->f_pos = i_size_read(handle->lgh_file->f_dentry->d_inode);
208
209         RETURN(rc);
210 }
211
212 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
213 /* appends if idx == -1, otherwise overwrites record idx. */
214 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
215                                struct llog_rec_hdr *rec,
216                                struct llog_cookie *reccookie, int cookiecount,
217                                void *buf, int idx)
218 {
219         struct llog_log_hdr *llh;
220         int reclen = rec->lrh_len, index, rc;
221         struct llog_rec_tail *lrt;
222         struct obd_device *obd;
223         struct file *file;
224         size_t left;
225         ENTRY;
226
227         llh = loghandle->lgh_hdr;
228         file = loghandle->lgh_file;
229         obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
230
231         /* record length should not bigger than LLOG_CHUNK_SIZE */
232         if (buf)
233                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
234                       sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
235         else
236                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
237         if (rc)
238                 RETURN(rc);
239
240         if (buf)
241                 /* write_blob adds header and tail to lrh_len. */
242                 reclen = sizeof(*rec) + rec->lrh_len +
243                          sizeof(struct llog_rec_tail);
244
245         if (idx != -1) {
246                 loff_t saved_offset;
247
248                 /* no header: only allowed to insert record 1 */
249                 if (idx != 1 && !i_size_read(file->f_dentry->d_inode)) {
250                         CERROR("idx != -1 in empty log\n");
251                         LBUG();
252                 }
253
254                 if (idx && llh->llh_size && llh->llh_size != rec->lrh_len)
255                         RETURN(-EINVAL);
256
257                 if (!ext2_test_bit(idx, llh->llh_bitmap))
258                         CERROR("Modify unset record %u\n", idx);
259                 if (idx != rec->lrh_index)
260                         CERROR("Index mismatch %d %u\n", idx, rec->lrh_index);
261
262                 rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
263                 /* we are done if we only write the header or on error */
264                 if (rc || idx == 0)
265                         RETURN(rc);
266
267                 /* Assumes constant lrh_len */
268                 saved_offset = sizeof(*llh) + (idx - 1) * reclen;
269
270                 if (buf) {
271                         struct llog_rec_hdr check;
272
273                         /* We assume that caller has set lgh_cur_* */
274                         saved_offset = loghandle->lgh_cur_offset;
275                         CDEBUG(D_OTHER,
276                                "modify record "LPX64": idx:%d/%u/%d, len:%u "
277                                "offset %llu\n",
278                                loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
279                                loghandle->lgh_cur_idx, rec->lrh_len,
280                                (long long)(saved_offset - sizeof(*llh)));
281                         if (rec->lrh_index != loghandle->lgh_cur_idx) {
282                                 CERROR("modify idx mismatch %u/%d\n",
283                                        idx, loghandle->lgh_cur_idx);
284                                 RETURN(-EFAULT);
285                         }
286 #if 1  /* FIXME remove this safety check at some point */
287                         /* Verify that the record we're modifying is the
288                            right one. */
289                         rc = llog_lvfs_read_blob(obd, file, &check,
290                                                  sizeof(check), saved_offset);
291                         if (check.lrh_index != idx || check.lrh_len != reclen) {
292                                 CERROR("Bad modify idx %u/%u size %u/%u (%d)\n",
293                                        idx, check.lrh_index, reclen,
294                                        check.lrh_len, rc);
295                                 RETURN(-EFAULT);
296                         }
297 #endif
298                 }
299
300                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
301                 if (rc == 0 && reccookie) {
302                         reccookie->lgc_lgl = loghandle->lgh_id;
303                         reccookie->lgc_index = idx;
304                         rc = 1;
305                 }
306                 RETURN(rc);
307         }
308
309         /* Make sure that records don't cross a chunk boundary, so we can
310          * process them page-at-a-time if needed.  If it will cross a chunk
311          * boundary, write in a fake (but referenced) entry to pad the chunk.
312          *
313          * We know that llog_current_log() will return a loghandle that is
314          * big enough to hold reclen, so all we care about is padding here.
315          */
316         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
317
318         /* NOTE: padding is a record, but no bit is set */
319         if (left != 0 && left != reclen &&
320             left < (reclen + LLOG_MIN_REC_SIZE)) {
321                  index = loghandle->lgh_last_idx + 1;
322                  rc = llog_lvfs_pad(obd, file, left, index);
323                  if (rc)
324                          RETURN(rc);
325                  loghandle->lgh_last_idx++; /*for pad rec*/
326          }
327          /* if it's the last idx in log file, then return -ENOSPC */
328          if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1)
329                  RETURN(-ENOSPC);
330         loghandle->lgh_last_idx++;
331         index = loghandle->lgh_last_idx;
332         LASSERT(index < LLOG_BITMAP_SIZE(llh));
333         rec->lrh_index = index;
334         if (buf == NULL) {
335                 lrt = (struct llog_rec_tail *)
336                         ((char *)rec + rec->lrh_len - sizeof(*lrt));
337                 lrt->lrt_len = rec->lrh_len;
338                 lrt->lrt_index = rec->lrh_index;
339         }
340         /*The caller should make sure only 1 process access the lgh_last_idx,
341          *Otherwise it might hit the assert.*/
342         LASSERT(index < LLOG_BITMAP_SIZE(llh));
343         if (ext2_set_bit(index, llh->llh_bitmap)) {
344                 CERROR("argh, index %u already set in log bitmap?\n", index);
345                 LBUG(); /* should never happen */
346         }
347         llh->llh_count++;
348         llh->llh_tail.lrt_index = index;
349
350         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
351         if (rc)
352                 RETURN(rc);
353
354         rc = llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
355         if (rc)
356                 RETURN(rc);
357
358         CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u \n",
359                loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
360         if (rc == 0 && reccookie) {
361                 reccookie->lgc_lgl = loghandle->lgh_id;
362                 reccookie->lgc_index = index;
363                 if ((rec->lrh_type == MDS_UNLINK_REC) ||
364                     (rec->lrh_type == MDS_SETATTR64_REC))
365                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
366                 else if (rec->lrh_type == OST_SZ_REC)
367                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
368                 else
369                         reccookie->lgc_subsys = -1;
370                 rc = 1;
371         }
372         if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
373                 rc = 1;
374
375         RETURN(rc);
376 }
377
378 /* We can skip reading at least as many log blocks as the number of
379 * minimum sized log records we are skipping.  If it turns out
380 * that we are not far enough along the log (because the
381 * actual records are larger than minimum size) we just skip
382 * some more records. */
383
384 static void llog_skip_over(__u64 *off, int curr, int goal)
385 {
386         if (goal <= curr)
387                 return;
388         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
389                 ~(LLOG_CHUNK_SIZE - 1);
390 }
391
392
393 /* sets:
394  *  - cur_offset to the furthest point read in the log file
395  *  - cur_idx to the log index preceeding cur_offset
396  * returns -EIO/-EINVAL on error
397  */
398 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
399                                 int next_idx, __u64 *cur_offset, void *buf,
400                                 int len)
401 {
402         int rc;
403         ENTRY;
404
405         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
406                 RETURN(-EINVAL);
407
408         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
409                next_idx, *cur_idx, *cur_offset);
410
411         while (*cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
412                 struct llog_rec_hdr *rec;
413                 struct llog_rec_tail *tail;
414                 loff_t ppos;
415
416                 llog_skip_over(cur_offset, *cur_idx, next_idx);
417
418                 ppos = *cur_offset;
419                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
420                                         loghandle->lgh_file, buf, len,
421                                         &ppos);
422                 if (rc) {
423                         CERROR("Cant read llog block at log id "LPU64
424                                "/%u offset "LPU64"\n",
425                                loghandle->lgh_id.lgl_oid,
426                                loghandle->lgh_id.lgl_ogen,
427                                *cur_offset);
428                         RETURN(rc);
429                 }
430
431                 /* put number of bytes read into rc to make code simpler */
432                 rc = ppos - *cur_offset;
433                 *cur_offset = ppos;
434
435                 if (rc < len) {
436                         /* signal the end of the valid buffer to llog_process */
437                         memset(buf + rc, 0, len - rc);
438                 }
439
440                 if (rc == 0) /* end of file, nothing to do */
441                         RETURN(0);
442
443                 if (rc < sizeof(*tail)) {
444                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
445                                LPU64"\n", loghandle->lgh_id.lgl_oid,
446                                loghandle->lgh_id.lgl_ogen, *cur_offset);
447                         RETURN(-EINVAL);
448                 }
449
450                 rec = buf;
451                 tail = (struct llog_rec_tail *)((char *)buf + rc -
452                                                 sizeof(struct llog_rec_tail));
453
454                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
455                         lustre_swab_llog_rec(rec);
456
457                 *cur_idx = tail->lrt_index;
458
459                 /* this shouldn't happen */
460                 if (tail->lrt_index == 0) {
461                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
462                                LPU64"\n", loghandle->lgh_id.lgl_oid,
463                                loghandle->lgh_id.lgl_ogen, *cur_offset);
464                         RETURN(-EINVAL);
465                 }
466                 if (tail->lrt_index < next_idx)
467                         continue;
468
469                 /* sanity check that the start of the new buffer is no farther
470                  * than the record that we wanted.  This shouldn't happen. */
471                 if (rec->lrh_index > next_idx) {
472                         CERROR("missed desired record? %u > %u\n",
473                                rec->lrh_index, next_idx);
474                         RETURN(-ENOENT);
475                 }
476                 RETURN(0);
477         }
478         RETURN(-EIO);
479 }
480
481 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
482                                 int prev_idx, void *buf, int len)
483 {
484         __u64 cur_offset;
485         int rc;
486         ENTRY;
487
488         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
489                 RETURN(-EINVAL);
490
491         CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
492
493         cur_offset = LLOG_CHUNK_SIZE;
494         llog_skip_over(&cur_offset, 0, prev_idx);
495
496         while (cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
497                 struct llog_rec_hdr *rec;
498                 struct llog_rec_tail *tail;
499                 loff_t ppos;
500
501                 ppos = cur_offset;
502
503                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
504                                         loghandle->lgh_file, buf, len,
505                                         &ppos);
506                 if (rc) {
507                         CERROR("Cant read llog block at log id "LPU64
508                                "/%u offset "LPU64"\n",
509                                loghandle->lgh_id.lgl_oid,
510                                loghandle->lgh_id.lgl_ogen,
511                                cur_offset);
512                         RETURN(rc);
513                 }
514
515                 /* put number of bytes read into rc to make code simpler */
516                 rc = ppos - cur_offset;
517                 cur_offset = ppos;
518
519                 if (rc == 0) /* end of file, nothing to do */
520                         RETURN(0);
521
522                 if (rc < sizeof(*tail)) {
523                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
524                                LPU64"\n", loghandle->lgh_id.lgl_oid,
525                                loghandle->lgh_id.lgl_ogen, cur_offset);
526                         RETURN(-EINVAL);
527                 }
528
529                 tail = buf + rc - sizeof(struct llog_rec_tail);
530
531                 /* this shouldn't happen */
532                 if (tail->lrt_index == 0) {
533                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
534                                LPU64"\n", loghandle->lgh_id.lgl_oid,
535                                loghandle->lgh_id.lgl_ogen, cur_offset);
536                         RETURN(-EINVAL);
537                 }
538                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
539                         continue;
540
541                 /* sanity check that the start of the new buffer is no farther
542                  * than the record that we wanted.  This shouldn't happen. */
543                 rec = buf;
544                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
545                         CERROR("missed desired record? %u > %u\n",
546                                le32_to_cpu(rec->lrh_index), prev_idx);
547                         RETURN(-ENOENT);
548                 }
549                 RETURN(0);
550         }
551         RETURN(-EIO);
552 }
553
554 static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
555 {
556         char *logname;
557         struct file *filp;
558         int len;
559
560         OBD_ALLOC(logname, PATH_MAX);
561         if (logname == NULL)
562                 return ERR_PTR(-ENOMEM);
563
564         len = snprintf(logname, PATH_MAX, "%s/%s", dir, name);
565         if (len >= PATH_MAX - 1) {
566                 filp = ERR_PTR(-ENAMETOOLONG);
567         } else {
568                 filp = l_filp_open(logname, flags, mode);
569                 if (IS_ERR(filp))
570                         CERROR("logfile creation %s: %ld\n", logname,
571                                PTR_ERR(filp));
572         }
573         OBD_FREE(logname, PATH_MAX);
574         return filp;
575 }
576
577 /* This is a callback from the llog_* functions.
578  * Assumes caller has already pushed us into the kernel context. */
579 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
580                             struct llog_logid *logid, char *name)
581 {
582         struct llog_handle *handle;
583         struct obd_device *obd;
584         struct l_dentry *dchild = NULL;
585         struct obdo *oa = NULL;
586         int rc = 0;
587         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
588         ENTRY;
589
590         handle = llog_alloc_handle();
591         if (handle == NULL)
592                 RETURN(-ENOMEM);
593         *res = handle;
594
595         LASSERT(ctxt);
596         LASSERT(ctxt->loc_exp);
597         obd = ctxt->loc_exp->exp_obd;
598
599         if (logid != NULL) {
600                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
601                                              logid->lgl_ogen, logid->lgl_oseq);
602
603                 if (IS_ERR(dchild)) {
604                         rc = PTR_ERR(dchild);
605                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
606                                logid->lgl_oid, logid->lgl_ogen, rc);
607                         GOTO(out, rc);
608                 }
609
610                 if (dchild->d_inode == NULL) {
611                         l_dput(dchild);
612                         rc = -ENOENT;
613                         CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n",
614                                logid->lgl_oid, logid->lgl_oseq, rc);
615                         GOTO(out, rc);
616                 }
617
618                 /* l_dentry_open will call dput(dchild) if there is an error */
619                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
620                                                     O_RDWR | O_LARGEFILE);
621                 if (IS_ERR(handle->lgh_file)) {
622                         rc = PTR_ERR(handle->lgh_file);
623                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
624                                logid->lgl_oid, logid->lgl_ogen, rc);
625                         GOTO(out, rc);
626                 }
627
628                 /* assign the value of lgh_id for handle directly */
629                 handle->lgh_id = *logid;
630
631         } else if (name) {
632                 handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
633                                                   name, open_flags, 0644);
634                 if (IS_ERR(handle->lgh_file))
635                         GOTO(out, rc = PTR_ERR(handle->lgh_file));
636
637                 handle->lgh_id.lgl_oseq = 1;
638                 handle->lgh_id.lgl_oid =
639                         handle->lgh_file->f_dentry->d_inode->i_ino;
640                 handle->lgh_id.lgl_ogen =
641                         handle->lgh_file->f_dentry->d_inode->i_generation;
642         } else {
643                 OBDO_ALLOC(oa);
644                 if (oa == NULL)
645                         GOTO(out, rc = -ENOMEM);
646
647                 oa->o_seq = FID_SEQ_LLOG;
648                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
649
650                 rc = obd_create(NULL, ctxt->loc_exp, oa, NULL, NULL);
651                 if (rc)
652                         GOTO(out, rc);
653
654                 /* FIXME: rationalize the misuse of o_generation in
655                  *        this API along with mds_obd_{create,destroy}.
656                  *        Hopefully it is only an internal API issue. */
657 #define o_generation o_parent_oid
658                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
659                                              oa->o_generation, oa->o_seq);
660
661                 if (IS_ERR(dchild))
662                         GOTO(out, rc = PTR_ERR(dchild));
663
664                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
665                                                  open_flags);
666                 if (IS_ERR(handle->lgh_file))
667                         GOTO(out, rc = PTR_ERR(handle->lgh_file));
668
669                 handle->lgh_id.lgl_oseq = oa->o_seq;
670                 handle->lgh_id.lgl_oid = oa->o_id;
671                 handle->lgh_id.lgl_ogen = oa->o_generation;
672         }
673
674         handle->lgh_ctxt = ctxt;
675 out:
676         if (rc)
677                 llog_free_handle(handle);
678
679         if (oa)
680                 OBDO_FREE(oa);
681         RETURN(rc);
682 }
683
684 static int llog_lvfs_close(struct llog_handle *handle)
685 {
686         int rc;
687         ENTRY;
688
689         rc = filp_close(handle->lgh_file, 0);
690         if (rc)
691                 CERROR("error closing log: rc %d\n", rc);
692         RETURN(rc);
693 }
694
695 static int llog_lvfs_destroy(struct llog_handle *handle)
696 {
697         struct dentry *fdentry;
698         struct obdo *oa;
699         struct obd_device *obd = handle->lgh_ctxt->loc_exp->exp_obd;
700         char *dir;
701         void *th;
702         struct inode *inode;
703         int rc, rc1;
704         ENTRY;
705
706         dir = MOUNT_CONFIGS_DIR;
707
708         fdentry = handle->lgh_file->f_dentry;
709         inode = fdentry->d_parent->d_inode;
710         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
711                 struct lvfs_run_ctxt saved;
712                 struct vfsmount *mnt = mntget(handle->lgh_file->f_vfsmnt);
713
714                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
715                 dget(fdentry);
716                 rc = llog_lvfs_close(handle);
717
718                 if (rc == 0) {
719                         mutex_lock_nested(&inode->i_mutex, I_MUTEX_PARENT);
720                         rc = ll_vfs_unlink(inode, fdentry, mnt);
721                         mutex_unlock(&inode->i_mutex);
722                 }
723                 mntput(mnt);
724
725                 dput(fdentry);
726                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
727                 RETURN(rc);
728         }
729
730         OBDO_ALLOC(oa);
731         if (oa == NULL)
732                 RETURN(-ENOMEM);
733
734         oa->o_id = handle->lgh_id.lgl_oid;
735         oa->o_seq = handle->lgh_id.lgl_oseq;
736         oa->o_generation = handle->lgh_id.lgl_ogen;
737 #undef o_generation
738         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
739
740         rc = llog_lvfs_close(handle);
741         if (rc)
742                 GOTO(out, rc);
743
744         th = fsfilt_start_log(obd, inode, FSFILT_OP_UNLINK, NULL, 1);
745         if (IS_ERR(th)) {
746                 CERROR("fsfilt_start failed: %ld\n", PTR_ERR(th));
747                 GOTO(out, rc = PTR_ERR(th));
748         }
749
750         rc = obd_destroy(NULL, handle->lgh_ctxt->loc_exp, oa,
751                          NULL, NULL, NULL, NULL);
752
753         rc1 = fsfilt_commit(obd, inode, th, 0);
754         if (rc == 0 && rc1 != 0)
755                 rc = rc1;
756  out:
757         OBDO_FREE(oa);
758         RETURN(rc);
759 }
760
761 /* reads the catalog list */
762 int llog_get_cat_list(struct obd_device *disk_obd,
763                       char *name, int idx, int count, struct llog_catid *idarray)
764 {
765         struct lvfs_run_ctxt saved;
766         struct l_file *file;
767         int rc, rc1 = 0;
768         int size = sizeof(*idarray) * count;
769         loff_t off = idx *  sizeof(*idarray);
770         ENTRY;
771
772         if (!count)
773                 RETURN(0);
774
775         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
776         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
777         if (!file || IS_ERR(file)) {
778                 rc = PTR_ERR(file);
779                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
780                        name, rc);
781                 GOTO(out, rc);
782         }
783
784         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
785                 CERROR("%s is not a regular file!: mode = %o\n", name,
786                        file->f_dentry->d_inode->i_mode);
787                 GOTO(out, rc = -ENOENT);
788         }
789
790         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
791                (int)i_size_read(file->f_dentry->d_inode), size);
792
793         /* read for new ost index or for empty file */
794         memset(idarray, 0, size);
795         if (i_size_read(file->f_dentry->d_inode) < off)
796                 GOTO(out, rc = 0);
797
798         rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
799         if (rc) {
800                 CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
801                 GOTO(out, rc);
802         }
803
804         EXIT;
805  out:
806         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
807         if (file && !IS_ERR(file))
808                 rc1 = filp_close(file, 0);
809         if (rc == 0)
810                 rc = rc1;
811         return rc;
812 }
813 EXPORT_SYMBOL(llog_get_cat_list);
814
815 /* writes the cat list */
816 int llog_put_cat_list(struct obd_device *disk_obd,
817                       char *name, int idx, int count, struct llog_catid *idarray)
818 {
819         struct lvfs_run_ctxt saved;
820         struct l_file *file;
821         int rc, rc1 = 0;
822         int size = sizeof(*idarray) * count;
823         loff_t off = idx * sizeof(*idarray);
824
825         if (!count)
826                 GOTO(out1, rc = 0);
827
828         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
829         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
830         if (!file || IS_ERR(file)) {
831                 rc = PTR_ERR(file);
832                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
833                        name, rc);
834                 GOTO(out, rc);
835         }
836
837         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
838                 CERROR("%s is not a regular file!: mode = %o\n", name,
839                        file->f_dentry->d_inode->i_mode);
840                 GOTO(out, rc = -ENOENT);
841         }
842
843         rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
844         if (rc) {
845                 CDEBUG(D_INODE,"OBD filter: error writeing %s: rc %d\n",
846                        name, rc);
847                 GOTO(out, rc);
848         }
849
850 out:
851         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
852         if (file && !IS_ERR(file))
853                 rc1 = filp_close(file, 0);
854
855         if (rc == 0)
856                 rc = rc1;
857 out1:
858         RETURN(rc);
859 }
860 EXPORT_SYMBOL(llog_put_cat_list);
861
862 struct llog_operations llog_lvfs_ops = {
863         lop_write_rec:   llog_lvfs_write_rec,
864         lop_next_block:  llog_lvfs_next_block,
865         lop_prev_block:  llog_lvfs_prev_block,
866         lop_read_header: llog_lvfs_read_header,
867         lop_create:      llog_lvfs_create,
868         lop_destroy:     llog_lvfs_destroy,
869         lop_close:       llog_lvfs_close,
870         //        lop_cancel: llog_lvfs_cancel,
871 };
872
873 EXPORT_SYMBOL(llog_lvfs_ops);
874
875 #else /* !__KERNEL__ */
876
877 static int llog_lvfs_read_header(struct llog_handle *handle)
878 {
879         LBUG();
880         return 0;
881 }
882
883 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
884                                struct llog_rec_hdr *rec,
885                                struct llog_cookie *reccookie, int cookiecount,
886                                void *buf, int idx)
887 {
888         LBUG();
889         return 0;
890 }
891
892 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
893                                 int next_idx, __u64 *cur_offset, void *buf,
894                                 int len)
895 {
896         LBUG();
897         return 0;
898 }
899
900 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
901                                 int prev_idx, void *buf, int len)
902 {
903         LBUG();
904         return 0;
905 }
906
907 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
908                             struct llog_logid *logid, char *name)
909 {
910         LBUG();
911         return 0;
912 }
913
914 static int llog_lvfs_close(struct llog_handle *handle)
915 {
916         LBUG();
917         return 0;
918 }
919
920 static int llog_lvfs_destroy(struct llog_handle *handle)
921 {
922         LBUG();
923         return 0;
924 }
925
926 int llog_get_cat_list(struct obd_device *disk_obd,
927                       char *name, int idx, int count, struct llog_catid *idarray)
928 {
929         LBUG();
930         return 0;
931 }
932
933 int llog_put_cat_list(struct obd_device *disk_obd,
934                       char *name, int idx, int count, struct llog_catid *idarray)
935 {
936         LBUG();
937         return 0;
938 }
939
940 struct llog_operations llog_lvfs_ops = {
941         lop_write_rec:   llog_lvfs_write_rec,
942         lop_next_block:  llog_lvfs_next_block,
943         lop_prev_block:  llog_lvfs_prev_block,
944         lop_read_header: llog_lvfs_read_header,
945         lop_create:      llog_lvfs_create,
946         lop_destroy:     llog_lvfs_destroy,
947         lop_close:       llog_lvfs_close,
948 //        lop_cancel:      llog_lvfs_cancel,
949 };
950 #endif