Whamcloud - gitweb
* fixed copy/paste error in previous 11684 commit
[fs/lustre-release.git] / lustre / obdclass / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * OST<->MDS recovery logging infrastructure.
26  *
27  * Invariants in implementation:
28  * - we do not share logs among different OST<->MDS connections, so that
29  *   if an OST or MDS fails it need only look at log(s) relevant to itself
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOG
33
34 #ifndef EXPORT_SYMTAB
35 #define EXPORT_SYMTAB
36 #endif
37
38 #ifndef __KERNEL__
39 #include <liblustre.h>
40 #endif
41
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <obd_ost.h>
46 #include <libcfs/list.h>
47 #include <lvfs.h>
48 #include <lustre_fsfilt.h>
49 #include <lustre_disk.h>
50 #include "llog_internal.h"
51
52 #if defined(__KERNEL__) && defined(LLOG_LVFS)
53
54 static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
55                                 int len, int index)
56 {
57         struct llog_rec_hdr rec = { 0 };
58         struct llog_rec_tail tail;
59         int rc;
60         ENTRY;
61
62         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
63
64         tail.lrt_len = rec.lrh_len = len;
65         tail.lrt_index = rec.lrh_index = index;
66         rec.lrh_type = LLOG_PAD_MAGIC;
67
68         rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
69         if (rc) {
70                 CERROR("error writing padding record: rc %d\n", rc);
71                 goto out;
72         }
73
74         file->f_pos += len - sizeof(rec) - sizeof(tail);
75         rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
76         if (rc) {
77                 CERROR("error writing padding record: rc %d\n", rc);
78                 goto out;
79         }
80
81  out:
82         RETURN(rc);
83 }
84
85 static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
86                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
87 {
88         int rc;
89         struct llog_rec_tail end;
90         loff_t saved_off = file->f_pos;
91         int buflen = rec->lrh_len;
92
93         ENTRY;
94         file->f_pos = off;
95
96         if (!buf) {
97                 rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
98                 if (rc) {
99                         CERROR("error writing log record: rc %d\n", rc);
100                         goto out;
101                 }
102                 GOTO(out, rc = 0);
103         }
104
105         /* the buf case */
106         rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
107         rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
108         if (rc) {
109                 CERROR("error writing log hdr: rc %d\n", rc);
110                 goto out;
111         }
112
113         rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
114         if (rc) {
115                 CERROR("error writing log buffer: rc %d\n", rc);
116                 goto out;
117         }
118
119         end.lrt_len = rec->lrh_len;
120         end.lrt_index = rec->lrh_index;
121         rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
122         if (rc) {
123                 CERROR("error writing log tail: rc %d\n", rc);
124                 goto out;
125         }
126
127         rc = 0;
128  out:
129         if (saved_off > file->f_pos)
130                 file->f_pos = saved_off;
131         LASSERT(rc <= 0);
132         RETURN(rc);
133 }
134
135 static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
136                                 void *buf, int size, loff_t off)
137 {
138         loff_t offset = off;
139         int rc;
140         ENTRY;
141
142         rc = fsfilt_read_record(obd, file, buf, size, &offset);
143         if (rc) {
144                 CERROR("error reading log record: rc %d\n", rc);
145                 RETURN(rc);
146         }
147         RETURN(0);
148 }
149
150 static int llog_lvfs_read_header(struct llog_handle *handle)
151 {
152         struct obd_device *obd;
153         int rc;
154         ENTRY;
155
156         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
157
158         obd = handle->lgh_ctxt->loc_exp->exp_obd;
159
160         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
161                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
162                 RETURN(LLOG_EEMPTY);
163         }
164
165         rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
166                                  LLOG_CHUNK_SIZE, 0);
167         if (rc) {
168                 CERROR("error reading log header from %.*s\n",
169                        handle->lgh_file->f_dentry->d_name.len,
170                        handle->lgh_file->f_dentry->d_name.name);
171         } else {
172                 struct llog_rec_hdr *llh_hdr = &handle->lgh_hdr->llh_hdr;
173
174                 if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
175                         lustre_swab_llog_hdr(handle->lgh_hdr);
176
177                 if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
178                         CERROR("bad log %.*s header magic: %#x (expected %#x)\n",
179                                handle->lgh_file->f_dentry->d_name.len,
180                                handle->lgh_file->f_dentry->d_name.name,
181                                llh_hdr->lrh_type, LLOG_HDR_MAGIC);
182                         rc = -EIO;
183                 } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
184                         CERROR("incorrectly sized log %.*s header: %#x "
185                                "(expected %#x)\n",
186                                handle->lgh_file->f_dentry->d_name.len,
187                                handle->lgh_file->f_dentry->d_name.name,
188                                llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
189                         CERROR("you may need to re-run lconf --write_conf.\n");
190                         rc = -EIO;
191                 }
192         }
193
194         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
195         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
196
197         RETURN(rc);
198 }
199
200 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
201 /* appends if idx == -1, otherwise overwrites record idx. */
202 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
203                                struct llog_rec_hdr *rec,
204                                struct llog_cookie *reccookie, int cookiecount,
205                                void *buf, int idx)
206 {
207         struct llog_log_hdr *llh;
208         int reclen = rec->lrh_len, index, rc;
209         struct llog_rec_tail *lrt;
210         struct obd_device *obd;
211         struct file *file;
212         size_t left;
213         ENTRY;
214
215         llh = loghandle->lgh_hdr;
216         file = loghandle->lgh_file;
217         obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
218
219         /* record length should not bigger than LLOG_CHUNK_SIZE */
220         if (buf)
221                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
222                       sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
223         else
224                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
225         if (rc)
226                 RETURN(rc);
227
228         if (buf)
229                 /* write_blob adds header and tail to lrh_len. */ 
230                 reclen = sizeof(*rec) + rec->lrh_len + 
231                         sizeof(struct llog_rec_tail);
232
233         if (idx != -1) {
234                 loff_t saved_offset;
235
236                 /* no header: only allowed to insert record 1 */
237                 if (idx != 1 && !file->f_dentry->d_inode->i_size) {
238                         CERROR("idx != -1 in empty log\n");
239                         LBUG();
240                 }
241                 
242                 if (idx && llh->llh_size && llh->llh_size != rec->lrh_len)
243                         RETURN(-EINVAL);
244
245                 if (!ext2_test_bit(idx, llh->llh_bitmap)) 
246                         CERROR("Modify unset record %u\n", idx);
247                 if (idx != rec->lrh_index)
248                         CERROR("Index mismatch %d %u\n", idx, rec->lrh_index);
249
250                 rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
251                 /* we are done if we only write the header or on error */
252                 if (rc || idx == 0)
253                         RETURN(rc);
254
255                 /* Assumes constant lrh_len */
256                 saved_offset = sizeof(*llh) + (idx - 1) * reclen;
257
258                 if (buf) {
259                         struct llog_rec_hdr check;
260
261                         /* We assume that caller has set lgh_cur_* */
262                         saved_offset = loghandle->lgh_cur_offset;
263                         CDEBUG(D_OTHER,
264                                "modify record "LPX64": idx:%d/%u/%d, len:%u "
265                                "offset %llu\n",
266                                loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
267                                loghandle->lgh_cur_idx,
268                                rec->lrh_len, saved_offset - sizeof(*llh));
269                         if (rec->lrh_index != loghandle->lgh_cur_idx) {
270                                 CERROR("modify idx mismatch %u/%d\n",
271                                        idx, loghandle->lgh_cur_idx);
272                                 RETURN(-EFAULT);
273                         }
274 #if 1  /* FIXME remove this safety check at some point */
275                         /* Verify that the record we're modifying is the 
276                            right one. */
277                         rc = llog_lvfs_read_blob(obd, file, &check,
278                                                  sizeof(check), saved_offset);
279                         if (check.lrh_index != idx || check.lrh_len != reclen) {
280                                 CERROR("Bad modify idx %u/%u size %u/%u (%d)\n",
281                                        idx, check.lrh_index, reclen, 
282                                        check.lrh_len, rc);
283                                 RETURN(-EFAULT);
284                         }
285 #endif
286                 }
287
288                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
289                 if (rc == 0 && reccookie) {
290                         reccookie->lgc_lgl = loghandle->lgh_id;
291                         reccookie->lgc_index = idx;
292                         rc = 1;
293                 }
294                 RETURN(rc);
295         }
296
297         /* Make sure that records don't cross a chunk boundary, so we can
298          * process them page-at-a-time if needed.  If it will cross a chunk
299          * boundary, write in a fake (but referenced) entry to pad the chunk.
300          *
301          * We know that llog_current_log() will return a loghandle that is
302          * big enough to hold reclen, so all we care about is padding here.
303          */
304         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
305
306         /* NOTE: padding is a record, but no bit is set */
307         if (left != 0 && left != reclen &&
308             left < (reclen + LLOG_MIN_REC_SIZE)) {
309                 loghandle->lgh_last_idx++;
310                 rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
311                 if (rc)
312                         RETURN(rc);
313                 /* if it's the last idx in log file, then return -ENOSPC */
314                 if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
315                         RETURN(-ENOSPC);
316         }
317
318         loghandle->lgh_last_idx++;
319         index = loghandle->lgh_last_idx;
320         LASSERT(index < LLOG_BITMAP_SIZE(llh));
321         rec->lrh_index = index;
322         if (buf == NULL) {
323                 lrt = (struct llog_rec_tail *)
324                         ((char *)rec + rec->lrh_len - sizeof(*lrt));
325                 lrt->lrt_len = rec->lrh_len;
326                 lrt->lrt_index = rec->lrh_index;
327         }
328         if (ext2_set_bit(index, llh->llh_bitmap)) {
329                 CERROR("argh, index %u already set in log bitmap?\n", index);
330                 LBUG(); /* should never happen */
331         }
332         llh->llh_count++;
333         llh->llh_tail.lrt_index = index;
334
335         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
336         if (rc)
337                 RETURN(rc);
338
339         rc = llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
340         if (rc)
341                 RETURN(rc);
342
343         CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
344                loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
345         if (rc == 0 && reccookie) {
346                 reccookie->lgc_lgl = loghandle->lgh_id;
347                 reccookie->lgc_index = index;
348                 if ((rec->lrh_type == MDS_UNLINK_REC) || 
349                                 (rec->lrh_type == MDS_SETATTR_REC))
350                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
351                 else if (rec->lrh_type == OST_SZ_REC)
352                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
353                 else if (rec->lrh_type == OST_RAID1_REC)
354                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
355                 else
356                         reccookie->lgc_subsys = -1;
357                 rc = 1;
358         }
359         if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
360                 rc = 1;
361
362         RETURN(rc);
363 }
364
365 /* We can skip reading at least as many log blocks as the number of
366 * minimum sized log records we are skipping.  If it turns out
367 * that we are not far enough along the log (because the
368 * actual records are larger than minimum size) we just skip
369 * some more records. */
370
371 static void llog_skip_over(__u64 *off, int curr, int goal)
372 {
373         if (goal <= curr)
374                 return;
375         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
376                 ~(LLOG_CHUNK_SIZE - 1);
377 }
378
379
380 /* sets:
381  *  - cur_offset to the furthest point read in the log file
382  *  - cur_idx to the log index preceeding cur_offset
383  * returns -EIO/-EINVAL on error
384  */
385 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
386                                 int next_idx, __u64 *cur_offset, void *buf,
387                                 int len)
388 {
389         int rc;
390         ENTRY;
391
392         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
393                 RETURN(-EINVAL);
394
395         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
396                next_idx, *cur_idx, *cur_offset);
397
398         while (*cur_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
399                 struct llog_rec_hdr *rec;
400                 struct llog_rec_tail *tail;
401                 loff_t ppos;
402
403                 llog_skip_over(cur_offset, *cur_idx, next_idx);
404
405                 ppos = *cur_offset;
406                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
407                                         loghandle->lgh_file, buf, len,
408                                         &ppos);
409                 if (rc) {
410                         CERROR("Cant read llog block at log id "LPU64
411                                "/%u offset "LPU64"\n",
412                                loghandle->lgh_id.lgl_oid,
413                                loghandle->lgh_id.lgl_ogen,
414                                *cur_offset);
415                         RETURN(rc);
416                 }
417
418                 /* put number of bytes read into rc to make code simpler */
419                 rc = ppos - *cur_offset;
420                 *cur_offset = ppos;
421                 
422                 if (rc < len) {
423                         /* signal the end of the valid buffer to llog_process */
424                         memset(buf + rc, 0, len - rc);
425                 }
426
427                 if (rc == 0) /* end of file, nothing to do */
428                         RETURN(0);
429
430                 if (rc < sizeof(*tail)) {
431                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
432                                LPU64"\n", loghandle->lgh_id.lgl_oid,
433                                loghandle->lgh_id.lgl_ogen, *cur_offset);
434                         RETURN(-EINVAL);
435                 }
436
437                 rec = buf;
438                 tail = (struct llog_rec_tail *)((char *)buf + rc -
439                                                 sizeof(struct llog_rec_tail));
440
441                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) {
442                         lustre_swab_llog_rec(rec, tail);
443                 }
444
445                 *cur_idx = tail->lrt_index;
446
447                 /* this shouldn't happen */
448                 if (tail->lrt_index == 0) {
449                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
450                                LPU64"\n", loghandle->lgh_id.lgl_oid,
451                                loghandle->lgh_id.lgl_ogen, *cur_offset);
452                         RETURN(-EINVAL);
453                 }
454                 if (tail->lrt_index < next_idx)
455                         continue;
456
457                 /* sanity check that the start of the new buffer is no farther
458                  * than the record that we wanted.  This shouldn't happen. */
459                 if (rec->lrh_index > next_idx) {
460                         CERROR("missed desired record? %u > %u\n",
461                                rec->lrh_index, next_idx);
462                         RETURN(-ENOENT);
463                 }
464                 RETURN(0);
465         }
466         RETURN(-EIO);
467 }
468
469 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
470                                 int prev_idx, void *buf, int len)
471 {
472         __u64 cur_offset;
473         int rc;
474         ENTRY;
475
476         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
477                 RETURN(-EINVAL);
478
479         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
480
481         cur_offset = LLOG_CHUNK_SIZE;
482         llog_skip_over(&cur_offset, 0, prev_idx);
483
484         while (cur_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
485                 struct llog_rec_hdr *rec;
486                 struct llog_rec_tail *tail;
487                 loff_t ppos;
488
489                 ppos = cur_offset;
490
491                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
492                                         loghandle->lgh_file, buf, len,
493                                         &ppos);
494                 if (rc) {
495                         CERROR("Cant read llog block at log id "LPU64
496                                "/%u offset "LPU64"\n",
497                                loghandle->lgh_id.lgl_oid,
498                                loghandle->lgh_id.lgl_ogen,
499                                cur_offset);
500                         RETURN(rc);
501                 }
502
503                 /* put number of bytes read into rc to make code simpler */
504                 rc = ppos - cur_offset;
505                 cur_offset = ppos;
506
507                 if (rc == 0) /* end of file, nothing to do */
508                         RETURN(0);
509
510                 if (rc < sizeof(*tail)) {
511                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
512                                LPU64"\n", loghandle->lgh_id.lgl_oid,
513                                loghandle->lgh_id.lgl_ogen, cur_offset);
514                         RETURN(-EINVAL);
515                 }
516
517                 tail = buf + rc - sizeof(struct llog_rec_tail);
518
519                 /* this shouldn't happen */
520                 if (tail->lrt_index == 0) {
521                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
522                                LPU64"\n", loghandle->lgh_id.lgl_oid,
523                                loghandle->lgh_id.lgl_ogen, cur_offset);
524                         RETURN(-EINVAL);
525                 }
526                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
527                         continue;
528
529                 /* sanity check that the start of the new buffer is no farther
530                  * than the record that we wanted.  This shouldn't happen. */
531                 rec = buf;
532                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
533                         CERROR("missed desired record? %u > %u\n",
534                                le32_to_cpu(rec->lrh_index), prev_idx);
535                         RETURN(-ENOENT);
536                 }
537                 RETURN(0);
538         }
539         RETURN(-EIO);
540 }
541
542 static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
543 {
544         char *logname;
545         struct file *filp;
546         int len;
547
548         OBD_ALLOC(logname, PATH_MAX);
549         if (logname == NULL)
550                 return ERR_PTR(-ENOMEM);
551
552         len = snprintf(logname, PATH_MAX, "%s/%s", dir, name);
553         if (len >= PATH_MAX - 1) {
554                 filp = ERR_PTR(-ENAMETOOLONG);
555         } else {
556                 filp = l_filp_open(logname, flags, mode);
557                 if (IS_ERR(filp))
558                         CERROR("logfile creation %s: %ld\n", logname,
559                                PTR_ERR(filp));
560         }
561         OBD_FREE(logname, PATH_MAX);
562         return filp;
563 }
564
565 /* This is a callback from the llog_* functions.
566  * Assumes caller has already pushed us into the kernel context. */
567 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
568                             struct llog_logid *logid, char *name)
569 {
570         struct llog_handle *handle;
571         struct obd_device *obd;
572         struct l_dentry *dchild = NULL;
573         struct obdo *oa = NULL;
574         int rc = 0, cleanup_phase = 1;
575         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
576         ENTRY;
577
578         handle = llog_alloc_handle();
579         if (handle == NULL)
580                 RETURN(-ENOMEM);
581         *res = handle;
582
583         LASSERT(ctxt);
584         LASSERT(ctxt->loc_exp);
585         obd = ctxt->loc_exp->exp_obd;
586
587         if (logid != NULL) {
588                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
589                                              logid->lgl_ogen, logid->lgl_ogr);
590
591                 if (IS_ERR(dchild)) {
592                         rc = PTR_ERR(dchild);
593                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
594                                logid->lgl_oid, logid->lgl_ogen, rc);
595                         GOTO(cleanup, rc);
596                 }
597
598                 cleanup_phase = 2;
599                 if (dchild->d_inode == NULL) {
600                         rc = -ENOENT;
601                         CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n",
602                                logid->lgl_oid, logid->lgl_ogr, rc);
603                         GOTO(cleanup, rc);
604                 }
605
606                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
607                                                     O_RDWR | O_LARGEFILE);
608                 if (IS_ERR(handle->lgh_file)) {
609                         rc = PTR_ERR(handle->lgh_file);
610                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
611                                logid->lgl_oid, logid->lgl_ogen, rc);
612                         GOTO(cleanup, rc);
613                 }
614
615                 /* assign the value of lgh_id for handle directly */
616                 handle->lgh_id = *logid;
617
618         } else if (name) {
619                 /* COMPAT_146 */
620                 if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME) == 0) {
621                         handle->lgh_file = llog_filp_open(MDT_LOGS_DIR, name, 
622                                                           open_flags, 0644);
623                 } else {
624                         /* end COMPAT_146 */
625                         handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
626                                                           name, open_flags, 
627                                                           0644);
628                 }
629                 if (IS_ERR(handle->lgh_file))
630                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
631
632                 handle->lgh_id.lgl_ogr = 1;
633                 handle->lgh_id.lgl_oid =
634                         handle->lgh_file->f_dentry->d_inode->i_ino;
635                 handle->lgh_id.lgl_ogen =
636                         handle->lgh_file->f_dentry->d_inode->i_generation;
637         } else {
638                 oa = obdo_alloc();
639                 if (oa == NULL)
640                         GOTO(cleanup, rc = -ENOMEM);
641
642                 oa->o_gr = FILTER_GROUP_LLOG;
643                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
644
645                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
646                 if (rc)
647                         GOTO(cleanup, rc);
648
649                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
650                                              oa->o_generation, oa->o_gr);
651
652                 if (IS_ERR(dchild))
653                         GOTO(cleanup, rc = PTR_ERR(dchild));
654                 cleanup_phase = 2;
655                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
656                                                  open_flags);
657                 if (IS_ERR(handle->lgh_file))
658                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
659
660                 handle->lgh_id.lgl_ogr = oa->o_gr;
661                 handle->lgh_id.lgl_oid = oa->o_id;
662                 handle->lgh_id.lgl_ogen = oa->o_generation;
663         }
664
665         handle->lgh_ctxt = ctxt;
666  finish:
667         if (oa)
668                 obdo_free(oa);
669         RETURN(rc);
670 cleanup:
671         switch (cleanup_phase) {
672         case 2:
673                 l_dput(dchild);
674         case 1:
675                 llog_free_handle(handle);
676         }
677         goto finish;
678 }
679
680 static int llog_lvfs_close(struct llog_handle *handle)
681 {
682         int rc;
683         ENTRY;
684
685         rc = filp_close(handle->lgh_file, 0);
686         if (rc)
687                 CERROR("error closing log: rc %d\n", rc);
688         RETURN(rc);
689 }
690
691 static int llog_lvfs_destroy(struct llog_handle *handle)
692 {
693         struct dentry *fdentry;
694         struct obdo *oa;
695         struct obd_device *obd = handle->lgh_ctxt->loc_exp->exp_obd;
696         char *dir;
697         int rc;
698         ENTRY;
699
700         /* COMPAT_146 */
701         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME) == 0)
702                 dir = MDT_LOGS_DIR;
703         else
704                 /* end COMPAT_146 */
705                 dir = MOUNT_CONFIGS_DIR;
706
707         fdentry = handle->lgh_file->f_dentry;
708         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
709                 struct inode *inode = fdentry->d_parent->d_inode;
710                 struct lvfs_run_ctxt saved;
711
712                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
713                 dget(fdentry);
714                 rc = llog_lvfs_close(handle);
715
716                 if (rc == 0) {
717                         LOCK_INODE_MUTEX(inode);
718                         rc = vfs_unlink(inode, fdentry);
719                         UNLOCK_INODE_MUTEX(inode);
720                 }
721
722                 dput(fdentry);
723                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
724                 RETURN(rc);
725         }
726
727         oa = obdo_alloc();
728         if (oa == NULL)
729                 RETURN(-ENOMEM);
730
731         oa->o_id = handle->lgh_id.lgl_oid;
732         oa->o_gr = handle->lgh_id.lgl_ogr;
733         oa->o_generation = handle->lgh_id.lgl_ogen;
734         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
735
736         rc = llog_lvfs_close(handle);
737         if (rc)
738                 GOTO(out, rc);
739
740         rc = obd_destroy(handle->lgh_ctxt->loc_exp, oa, NULL, NULL, NULL);
741  out:
742         obdo_free(oa);
743         RETURN(rc);
744 }
745
746 /* reads the catalog list */
747 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
748                       char *name, int count, struct llog_catid *idarray)
749 {
750         struct lvfs_run_ctxt saved;
751         struct l_file *file;
752         int rc;
753         int size = sizeof(*idarray) * count;
754         loff_t off = 0;
755         ENTRY;
756
757         if (!count) 
758                 RETURN(0);
759
760         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
761         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
762         if (!file || IS_ERR(file)) {
763                 rc = PTR_ERR(file);
764                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
765                        name, rc);
766                 GOTO(out, rc);
767         }
768         
769         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
770                 CERROR("%s is not a regular file!: mode = %o\n", name,
771                        file->f_dentry->d_inode->i_mode);
772                 GOTO(out, rc = -ENOENT);
773         }
774
775         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n", 
776                (int)file->f_dentry->d_inode->i_size, size);
777
778         rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
779         if (rc) {
780                 CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
781                 GOTO(out, rc);
782         }
783
784         EXIT;
785  out:
786         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
787         if (file && !IS_ERR(file))
788                 rc = filp_close(file, 0);
789         return rc;
790 }
791 EXPORT_SYMBOL(llog_get_cat_list);
792
793 /* writes the cat list */
794 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
795                       char *name, int count, struct llog_catid *idarray)
796 {
797         struct lvfs_run_ctxt saved;
798         struct l_file *file;
799         int rc;
800         int size = sizeof(*idarray) * count;
801         loff_t off = 0;
802
803         if (!count) 
804                 return (0);
805
806         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
807         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
808         if (!file || IS_ERR(file)) {
809                 rc = PTR_ERR(file);
810                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
811                        name, rc);
812                 GOTO(out, rc);
813         }
814
815         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
816                 CERROR("%s is not a regular file!: mode = %o\n", name,
817                        file->f_dentry->d_inode->i_mode);
818                 GOTO(out, rc = -ENOENT);
819         }
820
821         rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
822         if (rc) {
823                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
824                        name, rc);
825                 GOTO(out, rc);
826         }
827
828  out:
829         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
830         if (file && !IS_ERR(file))
831                 rc = filp_close(file, 0);
832         RETURN(rc);
833 }
834
835 struct llog_operations llog_lvfs_ops = {
836         lop_write_rec:   llog_lvfs_write_rec,
837         lop_next_block:  llog_lvfs_next_block,
838         lop_prev_block:  llog_lvfs_prev_block,
839         lop_read_header: llog_lvfs_read_header,
840         lop_create:      llog_lvfs_create,
841         lop_destroy:     llog_lvfs_destroy,
842         lop_close:       llog_lvfs_close,
843         //        lop_cancel: llog_lvfs_cancel,
844 };
845
846 EXPORT_SYMBOL(llog_lvfs_ops);
847
848 #else /* !__KERNEL__ */
849
850 static int llog_lvfs_read_header(struct llog_handle *handle)
851 {
852         LBUG();
853         return 0;
854 }
855
856 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
857                                struct llog_rec_hdr *rec,
858                                struct llog_cookie *reccookie, int cookiecount,
859                                void *buf, int idx)
860 {
861         LBUG();
862         return 0;
863 }
864
865 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
866                                 int next_idx, __u64 *cur_offset, void *buf,
867                                 int len)
868 {
869         LBUG();
870         return 0;
871 }
872
873 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
874                                 int prev_idx, void *buf, int len)
875 {
876         LBUG();
877         return 0;
878 }
879
880 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
881                             struct llog_logid *logid, char *name)
882 {
883         LBUG();
884         return 0;
885 }
886
887 static int llog_lvfs_close(struct llog_handle *handle)
888 {
889         LBUG();
890         return 0;
891 }
892
893 static int llog_lvfs_destroy(struct llog_handle *handle)
894 {
895         LBUG();
896         return 0;
897 }
898
899 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
900                       char *name, int count, struct llog_catid *idarray)
901 {
902         LBUG();
903         return 0;
904 }
905
906 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
907                       char *name, int count, struct llog_catid *idarray)
908 {
909         LBUG();
910         return 0;
911 }
912
913 struct llog_operations llog_lvfs_ops = {
914         lop_write_rec:   llog_lvfs_write_rec,
915         lop_next_block:  llog_lvfs_next_block,
916         lop_prev_block:  llog_lvfs_prev_block,
917         lop_read_header: llog_lvfs_read_header,
918         lop_create:      llog_lvfs_create,
919         lop_destroy:     llog_lvfs_destroy,
920         lop_close:       llog_lvfs_close,
921 //        lop_cancel:      llog_lvfs_cancel,
922 };
923 #endif