Whamcloud - gitweb
b=11752
[fs/lustre-release.git] / lustre / obdclass / llog_lvfs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * OST<->MDS recovery logging infrastructure.
26  *
27  * Invariants in implementation:
28  * - we do not share logs among different OST<->MDS connections, so that
29  *   if an OST or MDS fails it need only look at log(s) relevant to itself
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOG
33
34 #ifndef EXPORT_SYMTAB
35 #define EXPORT_SYMTAB
36 #endif
37
38 #ifndef __KERNEL__
39 #include <liblustre.h>
40 #endif
41
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <obd_ost.h>
46 #include <libcfs/list.h>
47 #include <lvfs.h>
48 #include <lustre_fsfilt.h>
49 #include <lustre_disk.h>
50 #include "llog_internal.h"
51
52 #if defined(__KERNEL__) && defined(LLOG_LVFS)
53
54 static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
55                                 int len, int index)
56 {
57         struct llog_rec_hdr rec = { 0 };
58         struct llog_rec_tail tail;
59         int rc;
60         ENTRY;
61
62         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
63
64         tail.lrt_len = rec.lrh_len = len;
65         tail.lrt_index = rec.lrh_index = index;
66         rec.lrh_type = LLOG_PAD_MAGIC;
67
68         rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
69         if (rc) {
70                 CERROR("error writing padding record: rc %d\n", rc);
71                 goto out;
72         }
73
74         file->f_pos += len - sizeof(rec) - sizeof(tail);
75         rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
76         if (rc) {
77                 CERROR("error writing padding record: rc %d\n", rc);
78                 goto out;
79         }
80
81  out:
82         RETURN(rc);
83 }
84
85 static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
86                                 struct llog_rec_hdr *rec, void *buf, loff_t off)
87 {
88         int rc;
89         struct llog_rec_tail end;
90         loff_t saved_off = file->f_pos;
91         int buflen = rec->lrh_len;
92         ENTRY;
93
94         file->f_pos = off;
95
96         if (buflen == 0) 
97                 CWARN("0-length record\n");
98
99         if (!buf) {
100                 rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
101                 if (rc) {
102                         CERROR("error writing log record: rc %d\n", rc);
103                         goto out;
104                 }
105                 GOTO(out, rc = 0);
106         }
107
108         /* the buf case */
109         rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
110         rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
111         if (rc) {
112                 CERROR("error writing log hdr: rc %d\n", rc);
113                 goto out;
114         }
115
116         rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
117         if (rc) {
118                 CERROR("error writing log buffer: rc %d\n", rc);
119                 goto out;
120         }
121
122         end.lrt_len = rec->lrh_len;
123         end.lrt_index = rec->lrh_index;
124         rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
125         if (rc) {
126                 CERROR("error writing log tail: rc %d\n", rc);
127                 goto out;
128         }
129
130         rc = 0;
131  out:
132         if (saved_off > file->f_pos)
133                 file->f_pos = saved_off;
134         LASSERT(rc <= 0);
135         RETURN(rc);
136 }
137
138 static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
139                                 void *buf, int size, loff_t off)
140 {
141         loff_t offset = off;
142         int rc;
143         ENTRY;
144
145         rc = fsfilt_read_record(obd, file, buf, size, &offset);
146         if (rc) {
147                 CERROR("error reading log record: rc %d\n", rc);
148                 RETURN(rc);
149         }
150         RETURN(0);
151 }
152
153 static int llog_lvfs_read_header(struct llog_handle *handle)
154 {
155         struct obd_device *obd;
156         int rc;
157         ENTRY;
158
159         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
160
161         obd = handle->lgh_ctxt->loc_exp->exp_obd;
162
163         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
164                 CDEBUG(D_HA, "not reading header from 0-byte log\n");
165                 RETURN(LLOG_EEMPTY);
166         }
167
168         rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
169                                  LLOG_CHUNK_SIZE, 0);
170         if (rc) {
171                 CERROR("error reading log header from %.*s\n",
172                        handle->lgh_file->f_dentry->d_name.len,
173                        handle->lgh_file->f_dentry->d_name.name);
174         } else {
175                 struct llog_rec_hdr *llh_hdr = &handle->lgh_hdr->llh_hdr;
176
177                 if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
178                         lustre_swab_llog_hdr(handle->lgh_hdr);
179
180                 if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
181                         CERROR("bad log %.*s header magic: %#x (expected %#x)\n",
182                                handle->lgh_file->f_dentry->d_name.len,
183                                handle->lgh_file->f_dentry->d_name.name,
184                                llh_hdr->lrh_type, LLOG_HDR_MAGIC);
185                         rc = -EIO;
186                 } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
187                         CERROR("incorrectly sized log %.*s header: %#x "
188                                "(expected %#x)\n",
189                                handle->lgh_file->f_dentry->d_name.len,
190                                handle->lgh_file->f_dentry->d_name.name,
191                                llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
192                         CERROR("you may need to re-run lconf --write_conf.\n");
193                         rc = -EIO;
194                 }
195         }
196
197         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
198         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
199
200         RETURN(rc);
201 }
202
203 /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
204 /* appends if idx == -1, otherwise overwrites record idx. */
205 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
206                                struct llog_rec_hdr *rec,
207                                struct llog_cookie *reccookie, int cookiecount,
208                                void *buf, int idx)
209 {
210         struct llog_log_hdr *llh;
211         int reclen = rec->lrh_len, index, rc;
212         struct llog_rec_tail *lrt;
213         struct obd_device *obd;
214         struct file *file;
215         size_t left;
216         ENTRY;
217
218         llh = loghandle->lgh_hdr;
219         file = loghandle->lgh_file;
220         obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
221
222         /* record length should not bigger than LLOG_CHUNK_SIZE */
223         if (buf)
224                 rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
225                       sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
226         else
227                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
228         if (rc)
229                 RETURN(rc);
230
231         if (buf)
232                 /* write_blob adds header and tail to lrh_len. */ 
233                 reclen = sizeof(*rec) + rec->lrh_len + 
234                         sizeof(struct llog_rec_tail);
235
236         if (idx != -1) {
237                 loff_t saved_offset;
238
239                 /* no header: only allowed to insert record 1 */
240                 if (idx != 1 && !file->f_dentry->d_inode->i_size) {
241                         CERROR("idx != -1 in empty log\n");
242                         LBUG();
243                 }
244                 
245                 if (idx && llh->llh_size && llh->llh_size != rec->lrh_len)
246                         RETURN(-EINVAL);
247
248                 if (!ext2_test_bit(idx, llh->llh_bitmap)) 
249                         CERROR("Modify unset record %u\n", idx);
250                 if (idx != rec->lrh_index)
251                         CERROR("Index mismatch %d %u\n", idx, rec->lrh_index);
252
253                 rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
254                 /* we are done if we only write the header or on error */
255                 if (rc || idx == 0)
256                         RETURN(rc);
257
258                 /* Assumes constant lrh_len */
259                 saved_offset = sizeof(*llh) + (idx - 1) * reclen;
260
261                 if (buf) {
262                         struct llog_rec_hdr check;
263
264                         /* We assume that caller has set lgh_cur_* */
265                         saved_offset = loghandle->lgh_cur_offset;
266                         CDEBUG(D_OTHER,
267                                "modify record "LPX64": idx:%d/%u/%d, len:%u "
268                                "offset %llu\n",
269                                loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
270                                loghandle->lgh_cur_idx,
271                                rec->lrh_len, saved_offset - sizeof(*llh));
272                         if (rec->lrh_index != loghandle->lgh_cur_idx) {
273                                 CERROR("modify idx mismatch %u/%d\n",
274                                        idx, loghandle->lgh_cur_idx);
275                                 RETURN(-EFAULT);
276                         }
277 #if 1  /* FIXME remove this safety check at some point */
278                         /* Verify that the record we're modifying is the 
279                            right one. */
280                         rc = llog_lvfs_read_blob(obd, file, &check,
281                                                  sizeof(check), saved_offset);
282                         if (check.lrh_index != idx || check.lrh_len != reclen) {
283                                 CERROR("Bad modify idx %u/%u size %u/%u (%d)\n",
284                                        idx, check.lrh_index, reclen, 
285                                        check.lrh_len, rc);
286                                 RETURN(-EFAULT);
287                         }
288 #endif
289                 }
290
291                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
292                 if (rc == 0 && reccookie) {
293                         reccookie->lgc_lgl = loghandle->lgh_id;
294                         reccookie->lgc_index = idx;
295                         rc = 1;
296                 }
297                 RETURN(rc);
298         }
299
300         /* Make sure that records don't cross a chunk boundary, so we can
301          * process them page-at-a-time if needed.  If it will cross a chunk
302          * boundary, write in a fake (but referenced) entry to pad the chunk.
303          *
304          * We know that llog_current_log() will return a loghandle that is
305          * big enough to hold reclen, so all we care about is padding here.
306          */
307         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
308
309         /* NOTE: padding is a record, but no bit is set */
310         if (left != 0 && left != reclen &&
311             left < (reclen + LLOG_MIN_REC_SIZE)) {
312                 loghandle->lgh_last_idx++;
313                 rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
314                 if (rc)
315                         RETURN(rc);
316                 /* if it's the last idx in log file, then return -ENOSPC */
317                 if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
318                         RETURN(-ENOSPC);
319         }
320
321         loghandle->lgh_last_idx++;
322         index = loghandle->lgh_last_idx;
323         LASSERT(index < LLOG_BITMAP_SIZE(llh));
324         rec->lrh_index = index;
325         if (buf == NULL) {
326                 lrt = (struct llog_rec_tail *)
327                         ((char *)rec + rec->lrh_len - sizeof(*lrt));
328                 lrt->lrt_len = rec->lrh_len;
329                 lrt->lrt_index = rec->lrh_index;
330         }
331         if (ext2_set_bit(index, llh->llh_bitmap)) {
332                 CERROR("argh, index %u already set in log bitmap?\n", index);
333                 LBUG(); /* should never happen */
334         }
335         llh->llh_count++;
336         llh->llh_tail.lrt_index = index;
337
338         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
339         if (rc)
340                 RETURN(rc);
341
342         rc = llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
343         if (rc)
344                 RETURN(rc);
345
346         CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
347                loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
348         if (rc == 0 && reccookie) {
349                 reccookie->lgc_lgl = loghandle->lgh_id;
350                 reccookie->lgc_index = index;
351                 if ((rec->lrh_type == MDS_UNLINK_REC) || 
352                                 (rec->lrh_type == MDS_SETATTR_REC))
353                         reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
354                 else if (rec->lrh_type == OST_SZ_REC)
355                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
356                 else if (rec->lrh_type == OST_RAID1_REC)
357                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
358                 else
359                         reccookie->lgc_subsys = -1;
360                 rc = 1;
361         }
362         if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
363                 rc = 1;
364
365         RETURN(rc);
366 }
367
368 /* We can skip reading at least as many log blocks as the number of
369 * minimum sized log records we are skipping.  If it turns out
370 * that we are not far enough along the log (because the
371 * actual records are larger than minimum size) we just skip
372 * some more records. */
373
374 static void llog_skip_over(__u64 *off, int curr, int goal)
375 {
376         if (goal <= curr)
377                 return;
378         *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
379                 ~(LLOG_CHUNK_SIZE - 1);
380 }
381
382
383 /* sets:
384  *  - cur_offset to the furthest point read in the log file
385  *  - cur_idx to the log index preceeding cur_offset
386  * returns -EIO/-EINVAL on error
387  */
388 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
389                                 int next_idx, __u64 *cur_offset, void *buf,
390                                 int len)
391 {
392         int rc;
393         ENTRY;
394
395         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
396                 RETURN(-EINVAL);
397
398         CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
399                next_idx, *cur_idx, *cur_offset);
400
401         while (*cur_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
402                 struct llog_rec_hdr *rec;
403                 struct llog_rec_tail *tail;
404                 loff_t ppos;
405
406                 llog_skip_over(cur_offset, *cur_idx, next_idx);
407
408                 ppos = *cur_offset;
409                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
410                                         loghandle->lgh_file, buf, len,
411                                         &ppos);
412                 if (rc) {
413                         CERROR("Cant read llog block at log id "LPU64
414                                "/%u offset "LPU64"\n",
415                                loghandle->lgh_id.lgl_oid,
416                                loghandle->lgh_id.lgl_ogen,
417                                *cur_offset);
418                         RETURN(rc);
419                 }
420
421                 /* put number of bytes read into rc to make code simpler */
422                 rc = ppos - *cur_offset;
423                 *cur_offset = ppos;
424                 
425                 if (rc < len) {
426                         /* signal the end of the valid buffer to llog_process */
427                         memset(buf + rc, 0, len - rc);
428                 }
429
430                 if (rc == 0) /* end of file, nothing to do */
431                         RETURN(0);
432
433                 if (rc < sizeof(*tail)) {
434                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
435                                LPU64"\n", loghandle->lgh_id.lgl_oid,
436                                loghandle->lgh_id.lgl_ogen, *cur_offset);
437                         RETURN(-EINVAL);
438                 }
439
440                 rec = buf;
441                 tail = (struct llog_rec_tail *)((char *)buf + rc -
442                                                 sizeof(struct llog_rec_tail));
443
444                 if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) {
445                         lustre_swab_llog_rec(rec, tail);
446                 }
447
448                 *cur_idx = tail->lrt_index;
449
450                 /* this shouldn't happen */
451                 if (tail->lrt_index == 0) {
452                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
453                                LPU64"\n", loghandle->lgh_id.lgl_oid,
454                                loghandle->lgh_id.lgl_ogen, *cur_offset);
455                         RETURN(-EINVAL);
456                 }
457                 if (tail->lrt_index < next_idx)
458                         continue;
459
460                 /* sanity check that the start of the new buffer is no farther
461                  * than the record that we wanted.  This shouldn't happen. */
462                 if (rec->lrh_index > next_idx) {
463                         CERROR("missed desired record? %u > %u\n",
464                                rec->lrh_index, next_idx);
465                         RETURN(-ENOENT);
466                 }
467                 RETURN(0);
468         }
469         RETURN(-EIO);
470 }
471
472 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
473                                 int prev_idx, void *buf, int len)
474 {
475         __u64 cur_offset;
476         int rc;
477         ENTRY;
478
479         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
480                 RETURN(-EINVAL);
481
482         CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
483
484         cur_offset = LLOG_CHUNK_SIZE;
485         llog_skip_over(&cur_offset, 0, prev_idx);
486
487         while (cur_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
488                 struct llog_rec_hdr *rec;
489                 struct llog_rec_tail *tail;
490                 loff_t ppos;
491
492                 ppos = cur_offset;
493
494                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
495                                         loghandle->lgh_file, buf, len,
496                                         &ppos);
497                 if (rc) {
498                         CERROR("Cant read llog block at log id "LPU64
499                                "/%u offset "LPU64"\n",
500                                loghandle->lgh_id.lgl_oid,
501                                loghandle->lgh_id.lgl_ogen,
502                                cur_offset);
503                         RETURN(rc);
504                 }
505
506                 /* put number of bytes read into rc to make code simpler */
507                 rc = ppos - cur_offset;
508                 cur_offset = ppos;
509
510                 if (rc == 0) /* end of file, nothing to do */
511                         RETURN(0);
512
513                 if (rc < sizeof(*tail)) {
514                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
515                                LPU64"\n", loghandle->lgh_id.lgl_oid,
516                                loghandle->lgh_id.lgl_ogen, cur_offset);
517                         RETURN(-EINVAL);
518                 }
519
520                 tail = buf + rc - sizeof(struct llog_rec_tail);
521
522                 /* this shouldn't happen */
523                 if (tail->lrt_index == 0) {
524                         CERROR("Invalid llog tail at log id "LPU64"/%u offset "
525                                LPU64"\n", loghandle->lgh_id.lgl_oid,
526                                loghandle->lgh_id.lgl_ogen, cur_offset);
527                         RETURN(-EINVAL);
528                 }
529                 if (le32_to_cpu(tail->lrt_index) < prev_idx)
530                         continue;
531
532                 /* sanity check that the start of the new buffer is no farther
533                  * than the record that we wanted.  This shouldn't happen. */
534                 rec = buf;
535                 if (le32_to_cpu(rec->lrh_index) > prev_idx) {
536                         CERROR("missed desired record? %u > %u\n",
537                                le32_to_cpu(rec->lrh_index), prev_idx);
538                         RETURN(-ENOENT);
539                 }
540                 RETURN(0);
541         }
542         RETURN(-EIO);
543 }
544
545 static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
546 {
547         char *logname;
548         struct file *filp;
549         int len;
550
551         OBD_ALLOC(logname, PATH_MAX);
552         if (logname == NULL)
553                 return ERR_PTR(-ENOMEM);
554
555         len = snprintf(logname, PATH_MAX, "%s/%s", dir, name);
556         if (len >= PATH_MAX - 1) {
557                 filp = ERR_PTR(-ENAMETOOLONG);
558         } else {
559                 filp = l_filp_open(logname, flags, mode);
560                 if (IS_ERR(filp))
561                         CERROR("logfile creation %s: %ld\n", logname,
562                                PTR_ERR(filp));
563         }
564         OBD_FREE(logname, PATH_MAX);
565         return filp;
566 }
567
568 /* This is a callback from the llog_* functions.
569  * Assumes caller has already pushed us into the kernel context. */
570 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
571                             struct llog_logid *logid, char *name)
572 {
573         struct llog_handle *handle;
574         struct obd_device *obd;
575         struct l_dentry *dchild = NULL;
576         struct obdo *oa = NULL;
577         int rc = 0, cleanup_phase = 1;
578         int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
579         ENTRY;
580
581         handle = llog_alloc_handle();
582         if (handle == NULL)
583                 RETURN(-ENOMEM);
584         *res = handle;
585
586         LASSERT(ctxt);
587         LASSERT(ctxt->loc_exp);
588         obd = ctxt->loc_exp->exp_obd;
589
590         if (logid != NULL) {
591                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, logid->lgl_oid,
592                                              logid->lgl_ogen, logid->lgl_ogr);
593
594                 if (IS_ERR(dchild)) {
595                         rc = PTR_ERR(dchild);
596                         CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
597                                logid->lgl_oid, logid->lgl_ogen, rc);
598                         GOTO(cleanup, rc);
599                 }
600
601                 cleanup_phase = 2;
602                 if (dchild->d_inode == NULL) {
603                         rc = -ENOENT;
604                         CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n",
605                                logid->lgl_oid, logid->lgl_ogr, rc);
606                         GOTO(cleanup, rc);
607                 }
608
609                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
610                                                     O_RDWR | O_LARGEFILE);
611                 if (IS_ERR(handle->lgh_file)) {
612                         rc = PTR_ERR(handle->lgh_file);
613                         CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
614                                logid->lgl_oid, logid->lgl_ogen, rc);
615                         GOTO(cleanup, rc);
616                 }
617
618                 /* assign the value of lgh_id for handle directly */
619                 handle->lgh_id = *logid;
620
621         } else if (name) {
622                 /* COMPAT_146 */
623                 if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME) == 0) {
624                         handle->lgh_file = llog_filp_open(MDT_LOGS_DIR, name, 
625                                                           open_flags, 0644);
626                 } else {
627                         /* end COMPAT_146 */
628                         handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
629                                                           name, open_flags, 
630                                                           0644);
631                 }
632                 if (IS_ERR(handle->lgh_file))
633                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
634
635                 handle->lgh_id.lgl_ogr = 1;
636                 handle->lgh_id.lgl_oid =
637                         handle->lgh_file->f_dentry->d_inode->i_ino;
638                 handle->lgh_id.lgl_ogen =
639                         handle->lgh_file->f_dentry->d_inode->i_generation;
640         } else {
641                 oa = obdo_alloc();
642                 if (oa == NULL)
643                         GOTO(cleanup, rc = -ENOMEM);
644
645                 oa->o_gr = FILTER_GROUP_LLOG;
646                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
647
648                 rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
649                 if (rc)
650                         GOTO(cleanup, rc);
651
652                 dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
653                                              oa->o_generation, oa->o_gr);
654
655                 if (IS_ERR(dchild))
656                         GOTO(cleanup, rc = PTR_ERR(dchild));
657                 cleanup_phase = 2;
658                 handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
659                                                  open_flags);
660                 if (IS_ERR(handle->lgh_file))
661                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
662
663                 handle->lgh_id.lgl_ogr = oa->o_gr;
664                 handle->lgh_id.lgl_oid = oa->o_id;
665                 handle->lgh_id.lgl_ogen = oa->o_generation;
666         }
667
668         handle->lgh_ctxt = ctxt;
669  finish:
670         if (oa)
671                 obdo_free(oa);
672         RETURN(rc);
673 cleanup:
674         switch (cleanup_phase) {
675         case 2:
676                 l_dput(dchild);
677         case 1:
678                 llog_free_handle(handle);
679         }
680         goto finish;
681 }
682
683 static int llog_lvfs_close(struct llog_handle *handle)
684 {
685         int rc;
686         ENTRY;
687
688         rc = filp_close(handle->lgh_file, 0);
689         if (rc)
690                 CERROR("error closing log: rc %d\n", rc);
691         RETURN(rc);
692 }
693
694 static int llog_lvfs_destroy(struct llog_handle *handle)
695 {
696         struct dentry *fdentry;
697         struct obdo *oa;
698         struct obd_device *obd = handle->lgh_ctxt->loc_exp->exp_obd;
699         char *dir;
700         int rc;
701         ENTRY;
702
703         /* COMPAT_146 */
704         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME) == 0)
705                 dir = MDT_LOGS_DIR;
706         else
707                 /* end COMPAT_146 */
708                 dir = MOUNT_CONFIGS_DIR;
709
710         fdentry = handle->lgh_file->f_dentry;
711         if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
712                 struct inode *inode = fdentry->d_parent->d_inode;
713                 struct lvfs_run_ctxt saved;
714
715                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
716                 dget(fdentry);
717                 rc = llog_lvfs_close(handle);
718
719                 if (rc == 0) {
720                         LOCK_INODE_MUTEX(inode);
721                         rc = vfs_unlink(inode, fdentry);
722                         UNLOCK_INODE_MUTEX(inode);
723                 }
724
725                 dput(fdentry);
726                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
727                 RETURN(rc);
728         }
729
730         oa = obdo_alloc();
731         if (oa == NULL)
732                 RETURN(-ENOMEM);
733
734         oa->o_id = handle->lgh_id.lgl_oid;
735         oa->o_gr = handle->lgh_id.lgl_ogr;
736         oa->o_generation = handle->lgh_id.lgl_ogen;
737         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
738
739         rc = llog_lvfs_close(handle);
740         if (rc)
741                 GOTO(out, rc);
742
743         rc = obd_destroy(handle->lgh_ctxt->loc_exp, oa, NULL, NULL, NULL);
744  out:
745         obdo_free(oa);
746         RETURN(rc);
747 }
748
749 /* reads the catalog list */
750 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
751                       char *name, int count, struct llog_catid *idarray)
752 {
753         struct lvfs_run_ctxt saved;
754         struct l_file *file;
755         int rc;
756         int size = sizeof(*idarray) * count;
757         loff_t off = 0;
758         ENTRY;
759
760         if (!count) 
761                 RETURN(0);
762
763         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
764         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
765         if (!file || IS_ERR(file)) {
766                 rc = PTR_ERR(file);
767                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
768                        name, rc);
769                 GOTO(out, rc);
770         }
771         
772         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
773                 CERROR("%s is not a regular file!: mode = %o\n", name,
774                        file->f_dentry->d_inode->i_mode);
775                 GOTO(out, rc = -ENOENT);
776         }
777
778         CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n", 
779                (int)file->f_dentry->d_inode->i_size, size);
780
781         rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
782         if (rc) {
783                 CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
784                 GOTO(out, rc);
785         }
786
787         EXIT;
788  out:
789         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
790         if (file && !IS_ERR(file))
791                 rc = filp_close(file, 0);
792         return rc;
793 }
794 EXPORT_SYMBOL(llog_get_cat_list);
795
796 /* writes the cat list */
797 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
798                       char *name, int count, struct llog_catid *idarray)
799 {
800         struct lvfs_run_ctxt saved;
801         struct l_file *file;
802         int rc;
803         int size = sizeof(*idarray) * count;
804         loff_t off = 0;
805
806         if (!count) 
807                 return (0);
808
809         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
810         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
811         if (!file || IS_ERR(file)) {
812                 rc = PTR_ERR(file);
813                 CERROR("OBD filter: cannot open/create %s: rc = %d\n",
814                        name, rc);
815                 GOTO(out, rc);
816         }
817
818         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
819                 CERROR("%s is not a regular file!: mode = %o\n", name,
820                        file->f_dentry->d_inode->i_mode);
821                 GOTO(out, rc = -ENOENT);
822         }
823
824         rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
825         if (rc) {
826                 CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
827                        name, rc);
828                 GOTO(out, rc);
829         }
830
831  out:
832         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
833         if (file && !IS_ERR(file))
834                 rc = filp_close(file, 0);
835         RETURN(rc);
836 }
837
838 struct llog_operations llog_lvfs_ops = {
839         lop_write_rec:   llog_lvfs_write_rec,
840         lop_next_block:  llog_lvfs_next_block,
841         lop_prev_block:  llog_lvfs_prev_block,
842         lop_read_header: llog_lvfs_read_header,
843         lop_create:      llog_lvfs_create,
844         lop_destroy:     llog_lvfs_destroy,
845         lop_close:       llog_lvfs_close,
846         //        lop_cancel: llog_lvfs_cancel,
847 };
848
849 EXPORT_SYMBOL(llog_lvfs_ops);
850
851 #else /* !__KERNEL__ */
852
853 static int llog_lvfs_read_header(struct llog_handle *handle)
854 {
855         LBUG();
856         return 0;
857 }
858
859 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
860                                struct llog_rec_hdr *rec,
861                                struct llog_cookie *reccookie, int cookiecount,
862                                void *buf, int idx)
863 {
864         LBUG();
865         return 0;
866 }
867
868 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
869                                 int next_idx, __u64 *cur_offset, void *buf,
870                                 int len)
871 {
872         LBUG();
873         return 0;
874 }
875
876 static int llog_lvfs_prev_block(struct llog_handle *loghandle,
877                                 int prev_idx, void *buf, int len)
878 {
879         LBUG();
880         return 0;
881 }
882
883 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
884                             struct llog_logid *logid, char *name)
885 {
886         LBUG();
887         return 0;
888 }
889
890 static int llog_lvfs_close(struct llog_handle *handle)
891 {
892         LBUG();
893         return 0;
894 }
895
896 static int llog_lvfs_destroy(struct llog_handle *handle)
897 {
898         LBUG();
899         return 0;
900 }
901
902 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
903                       char *name, int count, struct llog_catid *idarray)
904 {
905         LBUG();
906         return 0;
907 }
908
909 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
910                       char *name, int count, struct llog_catid *idarray)
911 {
912         LBUG();
913         return 0;
914 }
915
916 struct llog_operations llog_lvfs_ops = {
917         lop_write_rec:   llog_lvfs_write_rec,
918         lop_next_block:  llog_lvfs_next_block,
919         lop_prev_block:  llog_lvfs_prev_block,
920         lop_read_header: llog_lvfs_read_header,
921         lop_create:      llog_lvfs_create,
922         lop_destroy:     llog_lvfs_destroy,
923         lop_close:       llog_lvfs_close,
924 //        lop_cancel:      llog_lvfs_cancel,
925 };
926 #endif